jqueue.py 53.4 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
import logging
34
35
import errno
import re
36
import time
37
import weakref
Iustin Pop's avatar
Iustin Pop committed
38

Guido Trotter's avatar
Guido Trotter committed
39
40
41
42
43
44
45
try:
  # pylint: disable-msg=E0611
  from pyinotify import pyinotify
except ImportError:
  import pyinotify

from ganeti import asyncnotifier
Michael Hanselmann's avatar
Michael Hanselmann committed
46
from ganeti import constants
47
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
48
from ganeti import workerpool
49
from ganeti import locking
50
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
51
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
52
from ganeti import mcpu
53
from ganeti import utils
54
from ganeti import jstore
55
from ganeti import rpc
56
from ganeti import runtime
57
from ganeti import netutils
58
from ganeti import compat
Michael Hanselmann's avatar
Michael Hanselmann committed
59

60

61
JOBQUEUE_THREADS = 25
62
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
63

64
65
66
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
67

Iustin Pop's avatar
Iustin Pop committed
68

69
class CancelJob(Exception):
70
71
72
73
74
  """Special exception to cancel a job.

  """


75
def TimeStampNow():
76
77
78
79
80
81
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
82
83
84
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
85
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
86
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
87

88
89
90
91
92
93
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
Iustin Pop's avatar
Iustin Pop committed
94
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
95
  @ivar stop_timestamp: timestamp for the end of the execution
96

Michael Hanselmann's avatar
Michael Hanselmann committed
97
  """
98
  __slots__ = ["input", "status", "result", "log", "priority",
Iustin Pop's avatar
Iustin Pop committed
99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100
101
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
102
  def __init__(self, op):
103
104
105
106
107
108
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
109
110
111
112
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
113
    self.start_timestamp = None
Iustin Pop's avatar
Iustin Pop committed
114
    self.exec_timestamp = None
115
    self.end_timestamp = None
116

117
118
119
    # Get initial priority (it might change during the lifetime of this opcode)
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)

120
121
  @classmethod
  def Restore(cls, state):
122
123
124
125
126
127
128
129
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
130
131
132
133
134
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
135
    obj.start_timestamp = state.get("start_timestamp", None)
Iustin Pop's avatar
Iustin Pop committed
136
    obj.exec_timestamp = state.get("exec_timestamp", None)
137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
139
140
141
    return obj

  def Serialize(self):
142
143
144
145
146
147
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
148
149
150
151
152
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
153
      "start_timestamp": self.start_timestamp,
Iustin Pop's avatar
Iustin Pop committed
154
      "exec_timestamp": self.exec_timestamp,
155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
157
      }
158

Michael Hanselmann's avatar
Michael Hanselmann committed
159
160
161
162

class _QueuedJob(object):
  """In-memory job representation.

163
164
165
166
167
168
169
170
171
172
173
174
175
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
Michael Hanselmann's avatar
Michael Hanselmann committed
176
177

  """
Iustin Pop's avatar
Iustin Pop committed
178
  # pylint: disable-msg=W0212
179
  __slots__ = ["queue", "id", "ops", "log_serial", "current_op",
180
181
182
               "received_timestamp", "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
183
  def __init__(self, queue, job_id, ops):
184
185
186
187
188
189
190
191
192
193
194
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
195
    if not ops:
Guido Trotter's avatar
Guido Trotter committed
196
      raise errors.GenericError("A job needs at least one opcode")
Michael Hanselmann's avatar
Michael Hanselmann committed
197

Michael Hanselmann's avatar
Michael Hanselmann committed
198
    self.queue = queue
199
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
200
    self.ops = [_QueuedOpCode(op) for op in ops]
201
    self.log_serial = 0
202
203
204
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
205

206
207
    self.current_op = None

208
209
210
211
212
213
214
  def __repr__(self):
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
              "id=%s" % self.id,
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]

    return "<%s at %#x>" % (" ".join(status), id(self))

215
  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
216
  def Restore(cls, queue, state):
217
218
219
220
221
222
223
224
225
226
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
227
228
229
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
230
231
232
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
233
234
235
236
237
238
239
240
241

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

242
243
    obj.current_op = None

244
245
246
    return obj

  def Serialize(self):
247
248
249
250
251
252
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
253
254
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
255
      "ops": [op.Serialize() for op in self.ops],
256
257
258
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
259
260
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
261
  def CalcStatus(self):
262
263
264
265
266
267
268
269
270
271
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
272
          - canceling
273
274
275
276
277
278
279
280
281
282
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
283
284
285
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
286
287
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
288
289
290
291
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
292
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
293
        pass
Iustin Pop's avatar
Iustin Pop committed
294
295
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
296
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
297
        status = constants.JOB_STATUS_RUNNING
298
299
300
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
301
      elif op.status == constants.OP_STATUS_ERROR:
302
303
304
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
305
      elif op.status == constants.OP_STATUS_CANCELED:
306
307
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
308
309
310
311
312
313

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
  def CalcPriority(self):
    """Gets the current priority for this job.

    Only unfinished opcodes are considered. When all are done, the default
    priority is used.

    @rtype: int

    """
    priorities = [op.priority for op in self.ops
                  if op.status not in constants.OPS_FINALIZED]

    if not priorities:
      # All opcodes are done, assume default priority
      return constants.OP_PRIO_DEFAULT

    return min(priorities)

332
  def GetLogEntries(self, newer_than):
333
334
335
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
336
    @param newer_than: if this is None, return all log entries,
337
338
339
340
341
342
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
343
344
345
346
347
348
349
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
350
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
351
352
353

    return entries

354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
  def GetInfo(self, fields):
    """Returns information about a job.

    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
    row = []
    for fname in fields:
      if fname == "id":
        row.append(self.id)
      elif fname == "status":
        row.append(self.CalcStatus())
      elif fname == "ops":
        row.append([op.input.__getstate__() for op in self.ops])
      elif fname == "opresult":
        row.append([op.result for op in self.ops])
      elif fname == "opstatus":
        row.append([op.status for op in self.ops])
      elif fname == "oplog":
        row.append([op.log for op in self.ops])
      elif fname == "opstart":
        row.append([op.start_timestamp for op in self.ops])
      elif fname == "opexec":
        row.append([op.exec_timestamp for op in self.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in self.ops])
      elif fname == "received_ts":
        row.append(self.received_timestamp)
      elif fname == "start_ts":
        row.append(self.start_timestamp)
      elif fname == "end_ts":
        row.append(self.end_timestamp)
      elif fname == "summary":
        row.append([op.input.Summary() for op in self.ops])
      else:
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
    return row

397
398
399
400
401
402
403
404
405
406
407
  def MarkUnfinishedOps(self, status, result):
    """Mark unfinished opcodes with a given status and result.

    This is an utility function for marking all running or waiting to
    be run opcodes with a given status. Opcodes which are already
    finalised are not changed.

    @param status: a given opcode status
    @param result: the opcode result

    """
408
409
410
411
412
413
414
415
    not_marked = True
    for op in self.ops:
      if op.status in constants.OPS_FINALIZED:
        assert not_marked, "Finalized opcodes found after non-finalized ones"
        continue
      op.status = status
      op.result = result
      not_marked = False
416

417
  def Cancel(self):
418
419
420
421
422
423
424
    """Marks job as canceled/-ing if possible.

    @rtype: tuple; (bool, string)
    @return: Boolean describing whether job was successfully canceled or marked
      as canceling and a text message

    """
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
    status = self.CalcStatus()

    if status not in (constants.JOB_STATUS_QUEUED,
                      constants.JOB_STATUS_WAITLOCK):
      logging.debug("Job %s is no longer waiting in the queue", self.id)
      return (False, "Job %s is no longer waiting in the queue" % self.id)

    if status == constants.JOB_STATUS_QUEUED:
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                             "Job canceled by request")
      msg = "Job %s canceled" % self.id

    elif status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
      msg = "Job %s will be canceled" % self.id

    return (True, msg)

444

445
class _OpExecCallbacks(mcpu.OpExecCbBase):
446
447
  def __init__(self, queue, job, op):
    """Initializes this class.
448

449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
    @type queue: L{JobQueue}
    @param queue: Job queue
    @type job: L{_QueuedJob}
    @param job: Job object
    @type op: L{_QueuedOpCode}
    @param op: OpCode

    """
    assert queue, "Queue is missing"
    assert job, "Job is missing"
    assert op, "Opcode is missing"

    self._queue = queue
    self._job = job
    self._op = op

465
466
467
468
469
470
471
472
473
  def _CheckCancel(self):
    """Raises an exception to cancel the job if asked to.

    """
    # Cancel here if we were asked to
    if self._op.status == constants.OP_STATUS_CANCELING:
      logging.debug("Canceling opcode")
      raise CancelJob()

474
  @locking.ssynchronized(_QUEUE, shared=1)
475
  def NotifyStart(self):
Iustin Pop's avatar
Iustin Pop committed
476
477
    """Mark the opcode as running, not lock-waiting.

478
479
480
481
    This is called from the mcpu code as a notifier function, when the LU is
    finally about to start the Exec() method. Of course, to have end-user
    visible results, the opcode must be initially (before calling into
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
Iustin Pop's avatar
Iustin Pop committed
482
483

    """
484
    assert self._op in self._job.ops
485
486
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)
487

488
    # Cancel here if we were asked to
489
    self._CheckCancel()
490

491
    logging.debug("Opcode is now running")
492

493
494
495
496
497
    self._op.status = constants.OP_STATUS_RUNNING
    self._op.exec_timestamp = TimeStampNow()

    # And finally replicate the job status
    self._queue.UpdateJobUnlocked(self._job)
498

499
  @locking.ssynchronized(_QUEUE, shared=1)
500
501
502
503
504
505
506
507
  def _AppendFeedback(self, timestamp, log_type, log_msg):
    """Internal feedback append function, with locks

    """
    self._job.log_serial += 1
    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
    self._queue.UpdateJobUnlocked(self._job, replicate=False)

508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
  def Feedback(self, *args):
    """Append a log entry.

    """
    assert len(args) < 3

    if len(args) == 1:
      log_type = constants.ELOG_MESSAGE
      log_msg = args[0]
    else:
      (log_type, log_msg) = args

    # The time is split to make serialization easier and not lose
    # precision.
    timestamp = utils.SplitTime(time.time())
523
    self._AppendFeedback(timestamp, log_type, log_msg)
524

525
526
  def CheckCancel(self):
    """Check whether job has been cancelled.
527
528

    """
529
530
531
532
533
534
    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                               constants.OP_STATUS_CANCELING)

    # Cancel here if we were asked to
    self._CheckCancel()

535

536
537
538
class _JobChangesChecker(object):
  def __init__(self, fields, prev_job_info, prev_log_serial):
    """Initializes this class.
Guido Trotter's avatar
Guido Trotter committed
539

540
541
542
543
544
545
    @type fields: list of strings
    @param fields: Fields requested by LUXI client
    @type prev_job_info: string
    @param prev_job_info: previous job info, as passed by the LUXI client
    @type prev_log_serial: string
    @param prev_log_serial: previous job serial, as passed by the LUXI client
Guido Trotter's avatar
Guido Trotter committed
546

547
548
549
550
    """
    self._fields = fields
    self._prev_job_info = prev_job_info
    self._prev_log_serial = prev_log_serial
Guido Trotter's avatar
Guido Trotter committed
551

552
553
  def __call__(self, job):
    """Checks whether job has changed.
Guido Trotter's avatar
Guido Trotter committed
554

555
556
    @type job: L{_QueuedJob}
    @param job: Job object
Guido Trotter's avatar
Guido Trotter committed
557
558

    """
559
560
561
    status = job.CalcStatus()
    job_info = job.GetInfo(self._fields)
    log_entries = job.GetLogEntries(self._prev_log_serial)
Guido Trotter's avatar
Guido Trotter committed
562
563
564
565
566
567
568
569

    # Serializing and deserializing data can cause type changes (e.g. from
    # tuple to list) or precision loss. We're doing it here so that we get
    # the same modifications as the data received from the client. Without
    # this, the comparison afterwards might fail without the data being
    # significantly different.
    # TODO: we just deserialized from disk, investigate how to make sure that
    # the job info and log entries are compatible to avoid this further step.
570
571
572
573
    # TODO: Doing something like in testutils.py:UnifyValueType might be more
    # efficient, though floats will be tricky
    job_info = serializer.LoadJson(serializer.DumpJson(job_info))
    log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
Guido Trotter's avatar
Guido Trotter committed
574
575
576

    # Don't even try to wait if the job is no longer running, there will be
    # no changes.
577
578
579
580
581
582
583
    if (status not in (constants.JOB_STATUS_QUEUED,
                       constants.JOB_STATUS_RUNNING,
                       constants.JOB_STATUS_WAITLOCK) or
        job_info != self._prev_job_info or
        (log_entries and self._prev_log_serial != log_entries[0][0])):
      logging.debug("Job %s changed", job.id)
      return (job_info, log_entries)
Guido Trotter's avatar
Guido Trotter committed
584

585
586
587
588
589
590
591
592
593
594
    return None


class _JobFileChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file
    @raises errors.InotifyError: if the notifier cannot be setup
Guido Trotter's avatar
Guido Trotter committed
595

596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
    """
    self._wm = pyinotify.WatchManager()
    self._inotify_handler = \
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
    self._notifier = \
      pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
    try:
      self._inotify_handler.enable()
    except Exception:
      # pyinotify doesn't close file descriptors automatically
      self._notifier.stop()
      raise

  def _OnInotify(self, notifier_enabled):
    """Callback for inotify.

    """
Guido Trotter's avatar
Guido Trotter committed
613
    if not notifier_enabled:
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
      self._inotify_handler.enable()

  def Wait(self, timeout):
    """Waits for the job file to change.

    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    assert timeout >= 0
    have_events = self._notifier.check_events(timeout * 1000)
    if have_events:
      self._notifier.read_events()
    self._notifier.process_events()
    return have_events

  def Close(self):
    """Closes underlying notifier and its file descriptor.

    """
    self._notifier.stop()


class _JobChangesWaiter(object):
  def __init__(self, filename):
    """Initializes this class.

    @type filename: string
    @param filename: Path to job file

    """
    self._filewaiter = None
    self._filename = filename
Guido Trotter's avatar
Guido Trotter committed
648

649
650
  def Wait(self, timeout):
    """Waits for a job to change.
Guido Trotter's avatar
Guido Trotter committed
651

652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
    @type timeout: float
    @param timeout: Timeout in seconds
    @return: Whether there have been events

    """
    if self._filewaiter:
      return self._filewaiter.Wait(timeout)

    # Lazy setup: Avoid inotify setup cost when job file has already changed.
    # If this point is reached, return immediately and let caller check the job
    # file again in case there were changes since the last check. This avoids a
    # race condition.
    self._filewaiter = _JobFileChangesWaiter(self._filename)

    return True

  def Close(self):
    """Closes underlying waiter.

    """
    if self._filewaiter:
      self._filewaiter.Close()


class _WaitForJobChangesHelper(object):
  """Helper class using inotify to wait for changes in a job file.

  This class takes a previous job status and serial, and alerts the client when
  the current job status has changed.

  """
  @staticmethod
  def _CheckForChanges(job_load_fn, check_fn):
    job = job_load_fn()
    if not job:
      raise errors.JobLost()

    result = check_fn(job)
    if result is None:
      raise utils.RetryAgain()

    return result

  def __call__(self, filename, job_load_fn,
               fields, prev_job_info, prev_log_serial, timeout):
    """Waits for changes on a job.

    @type filename: string
    @param filename: File on which to wait for changes
    @type job_load_fn: callable
    @param job_load_fn: Function to load job
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
    @type timeout: float
    @param timeout: maximum time to wait in seconds

    """
Guido Trotter's avatar
Guido Trotter committed
713
    try:
714
715
716
717
718
719
720
721
722
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
      waiter = _JobChangesWaiter(filename)
      try:
        return utils.Retry(compat.partial(self._CheckForChanges,
                                          job_load_fn, check_fn),
                           utils.RETRY_REMAINING_TIME, timeout,
                           wait_fn=waiter.Wait)
      finally:
        waiter.Close()
Guido Trotter's avatar
Guido Trotter committed
723
724
725
726
727
728
    except (errors.InotifyError, errors.JobLost):
      return None
    except utils.RetryTimeout:
      return constants.JOB_NOTCHANGED


729
730
731
732
733
734
735
736
737
738
739
740
def _EncodeOpError(err):
  """Encodes an error which occurred while processing an opcode.

  """
  if isinstance(err, errors.GenericError):
    to_encode = err
  else:
    to_encode = errors.OpExecError(str(err))

  return errors.EncodeException(to_encode)


741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
class _JobProcessor(object):
  def __init__(self, queue, opexec_fn, job):
    """Initializes this class.

    """
    self.queue = queue
    self.opexec_fn = opexec_fn
    self.job = job

  @staticmethod
  def _FindNextOpcode(job):
    """Locates the next opcode to run.

    @type job: L{_QueuedJob}
    @param job: Job object

    """
    # Create some sort of a cache to speed up locating next opcode for future
    # lookups
    # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
    # pending and one for processed ops.
    if job.current_op is None:
      job.current_op = enumerate(job.ops)

    # Find next opcode to run
    while True:
      try:
        (idx, op) = job.current_op.next()
      except StopIteration:
        raise errors.ProgrammerError("Called for a finished job")

      if op.status == constants.OP_STATUS_RUNNING:
        # Found an opcode already marked as running
        raise errors.ProgrammerError("Called for job marked as running")

      log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
      summary = op.input.Summary()

      if op.status == constants.OP_STATUS_CANCELED:
        # Cancelled jobs are handled by the caller
        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
                              for i in job.ops[idx:])

      elif op.status in constants.OPS_FINALIZED:
        # This is a job that was partially completed before master daemon
        # shutdown, so it can be expected that some opcodes are already
        # completed successfully (if any did error out, then the whole job
        # should have been aborted and not resubmitted for processing).
        logging.info("%s: opcode %s already processed, skipping",
                     log_prefix, summary)
        continue

      return (idx, op, log_prefix, summary)

  @staticmethod
  def _MarkWaitlock(job, op):
    """Marks an opcode as waiting for locks.

    The job's start timestamp is also set if necessary.

    @type job: L{_QueuedJob}
    @param job: Job object
    @type job: L{_QueuedOpCode}
    @param job: Opcode object

    """
    assert op in job.ops

    op.status = constants.OP_STATUS_WAITLOCK
    op.result = None
    op.start_timestamp = TimeStampNow()

    if job.start_timestamp is None:
      job.start_timestamp = op.start_timestamp

  def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
    """Processes one opcode and returns the result.

    """
    assert op.status == constants.OP_STATUS_WAITLOCK

    try:
      # Make sure not to hold queue lock while calling ExecOpCode
      result = self.opexec_fn(op.input,
                              _OpExecCallbacks(self.queue, self.job, op))
    except CancelJob:
      logging.exception("%s: Canceling job", log_prefix)
      assert op.status == constants.OP_STATUS_CANCELING
      return (constants.OP_STATUS_CANCELING, None)
    except Exception, err: # pylint: disable-msg=W0703
      logging.exception("%s: Caught exception in %s", log_prefix, summary)
      return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
    else:
      logging.debug("%s: %s successful", log_prefix, summary)
      return (constants.OP_STATUS_SUCCESS, result)

  def __call__(self):
    """Continues execution of a job.

    @rtype: bool
    @return: True if job is finished, False if processor needs to be called
             again

    """
    queue = self.queue
    job = self.job

    logging.debug("Processing job %s", job.id)

    queue.acquire(shared=1)
    try:
      opcount = len(job.ops)

      (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)

      # Consistency check
      assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
                                     constants.OP_STATUS_CANCELED)
                        for i in job.ops[opidx:])

      assert op.status in (constants.OP_STATUS_QUEUED,
                           constants.OP_STATUS_WAITLOCK,
                           constants.OP_STATUS_CANCELED)

      if op.status != constants.OP_STATUS_CANCELED:
        # Prepare to start opcode
        self._MarkWaitlock(job, op)

        assert op.status == constants.OP_STATUS_WAITLOCK
        assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK

        # Write to disk
        queue.UpdateJobUnlocked(job)

        logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)

        queue.release()
        try:
          (op_status, op_result) = \
            self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
        finally:
          queue.acquire(shared=1)

        # Finalize opcode
        op.end_timestamp = TimeStampNow()
        op.status = op_status
        op.result = op_result

        if op.status == constants.OP_STATUS_CANCELING:
          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
                                for i in job.ops[opidx:])
        else:
          assert op.status in constants.OPS_FINALIZED

      # Ensure all opcodes so far have been successful
      assert (opidx == 0 or
              compat.all(i.status == constants.OP_STATUS_SUCCESS
                         for i in job.ops[:opidx]))

      if op.status == constants.OP_STATUS_SUCCESS:
        finalize = False

      elif op.status == constants.OP_STATUS_ERROR:
        # Ensure failed opcode has an exception as its result
        assert errors.GetEncodedError(job.ops[opidx].result)

        to_encode = errors.OpExecError("Preceding opcode failed")
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                              _EncodeOpError(to_encode))
        finalize = True

        # Consistency check
        assert compat.all(i.status == constants.OP_STATUS_ERROR and
                          errors.GetEncodedError(i.result)
                          for i in job.ops[opidx:])

      elif op.status == constants.OP_STATUS_CANCELING:
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                              "Job canceled by request")
        finalize = True

      elif op.status == constants.OP_STATUS_CANCELED:
        finalize = True

      else:
        raise errors.ProgrammerError("Unknown status '%s'" % op.status)

      # Finalizing or last opcode?
      if finalize or opidx == (opcount - 1):
        # All opcodes have been run, finalize job
        job.end_timestamp = TimeStampNow()

      # Write to disk. If the job status is final, this is the final write
      # allowed. Once the file has been written, it can be archived anytime.
      queue.UpdateJobUnlocked(job)

      if finalize or opidx == (opcount - 1):
        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
        return True

      return False
    finally:
      queue.release()


946
947
948
949
class _JobQueueWorker(workerpool.BaseWorker):
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
950
  def RunTask(self, job): # pylint: disable-msg=W0221
Michael Hanselmann's avatar
Michael Hanselmann committed
951
952
    """Job executor.

953
954
    This functions processes a job. It is closely tied to the L{_QueuedJob} and
    L{_QueuedOpCode} classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
955

956
957
958
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
959
    """
960
961
962
    queue = job.queue
    assert queue == self.pool.queue

963
964
    self.SetTaskName("Job%s" % job.id)

965
966
967
968
969
    proc = mcpu.Processor(queue.context, job.id)

    if not _JobProcessor(queue, proc.ExecOpCode, job)():
      # Schedule again
      raise workerpool.DeferTask()
Michael Hanselmann's avatar
Michael Hanselmann committed
970
971
972


class _JobQueueWorkerPool(workerpool.WorkerPool):
973
974
975
  """Simple class implementing a job-processing workerpool.

  """
976
  def __init__(self, queue):
977
978
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
                                              JOBQUEUE_THREADS,
Michael Hanselmann's avatar
Michael Hanselmann committed
979
                                              _JobQueueWorker)
980
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
981
982


Iustin Pop's avatar
Iustin Pop committed
983
984
def _RequireOpenQueue(fn):
  """Decorator for "public" functions.
985

Iustin Pop's avatar
Iustin Pop committed
986
987
988
989
  This function should be used for all 'public' functions. That is,
  functions usually called from other classes. Note that this should
  be applied only to methods (not plain functions), since it expects
  that the decorated function is called with a first argument that has
990
  a '_queue_filelock' argument.
991

992
  @warning: Use this decorator only after locking.ssynchronized
993

Iustin Pop's avatar
Iustin Pop committed
994
  Example::
995
    @locking.ssynchronized(_LOCK)
Iustin Pop's avatar
Iustin Pop committed
996
997
998
    @_RequireOpenQueue
    def Example(self):
      pass
999

Iustin Pop's avatar
Iustin Pop committed
1000
1001
  """
  def wrapper(self, *args, **kwargs):
Iustin Pop's avatar
Iustin Pop committed
1002
    # pylint: disable-msg=W0212
1003
    assert self._queue_filelock is not None, "Queue should be open"
Iustin Pop's avatar
Iustin Pop committed
1004
1005
    return fn(self, *args, **kwargs)
  return wrapper
1006
1007


Iustin Pop's avatar
Iustin Pop committed
1008
1009
class JobQueue(object):
  """Queue used to manage the jobs.
1010

Iustin Pop's avatar
Iustin Pop committed
1011
1012
1013
1014
  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
1015

Michael Hanselmann's avatar
Michael Hanselmann committed
1016
  def __init__(self, context):
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
1029
    self.context = context
1030
    self._memcache = weakref.WeakValueDictionary()
1031
    self._my_hostname = netutils.Hostname.GetSysName()
1032

1033
1034
1035
1036
1037
    # The Big JobQueue lock. If a code block or method acquires it in shared
    # mode safe it must guarantee concurrency with all the code acquiring it in
    # shared mode, including itself. In order not to acquire it at all
    # concurrency must be guaranteed with all code acquiring it in shared mode
    # and all code acquiring it exclusively.
1038
    self._lock = locking.SharedLock("JobQueue")
1039
1040
1041

    self.acquire = self._lock.acquire
    self.release = self._lock.release
Michael Hanselmann's avatar
Michael Hanselmann committed
1042

1043
1044
1045
    # Initialize the queue, and acquire the filelock.
    # This ensures no other process is working on the job queue.
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
1046

1047
1048
1049
1050
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
1051

1052
    # Get initial list of nodes
1053
    self._nodes = dict((n.name, n.primary_ip)
1054
1055
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
1056
1057

    # Remove master node
1058
    self._nodes.pop(self._my_hostname, None)
1059
1060
1061

    # TODO: Check consistency across nodes

1062
1063
1064
1065
    self._queue_size = 0
    self._UpdateQueueSizeUnlocked()
    self._drained = self._IsQueueMarkedDrain()

Michael Hanselmann's avatar
Michael Hanselmann committed
1066
    # Setup worker pool
1067
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
1068
    try:
1069
1070
1071
1072
      self._InspectQueue()
    except:
      self._wpool.TerminateWorkers()
      raise
1073

1074
1075
1076
1077
1078
1079
1080
  @locking.ssynchronized(_LOCK)
  @_RequireOpenQueue
  def _InspectQueue(self):
    """Loads the whole job queue and resumes unfinished jobs.

    This function needs the lock here because WorkerPool.AddTask() may start a
    job while we're still doing our work.
1081

1082
1083
1084
    """
    logging.info("Inspecting job queue")

1085
1086
    restartjobs = []

1087
1088
1089
1090
1091
1092
1093
1094
1095
    all_job_ids = self._GetJobIDsUnlocked()
    jobs_count = len(all_job_ids)
    lastinfo = time.time()
    for idx, job_id in enumerate(all_job_ids):
      # Give an update every 1000 jobs or 10 seconds
      if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
          idx == (jobs_count - 1)):
        logging.info("Job queue inspection: %d/%d (%0.1f %%)",
                     idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
1096
        lastinfo = time.time()
1097

1098
      job = self._LoadJobUnlocked(job_id)
Michael Hanselmann's avatar
Michael Hanselmann committed
1099

1100
1101
1102
      # a failure in loading the job can cause 'None' to be returned
      if job is None:
        continue
Michael Hanselmann's avatar
Michael Hanselmann committed
1103

1104
      status = job.CalcStatus()
1105

1106
      if status in (constants.JOB_STATUS_QUEUED, ):
1107
        restartjobs.append(job)
1108
1109

      elif status in (constants.JOB_STATUS_RUNNING,
1110
                      constants.JOB_STATUS_WAITLOCK,
1111
1112
1113
1114
1115
1116
                      constants.JOB_STATUS_CANCELING):
        logging.warning("Unfinished job %s found: %s", job.id, job)
        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                              "Unclean master daemon shutdown")
        self.UpdateJobUnlocked(job)

1117
1118
1119
1120
    if restartjobs:
      logging.info("Restarting %s jobs", len(restartjobs))
      self._EnqueueJobs(restartjobs)

1121
    logging.info("Job queue inspection finished")
Michael Hanselmann's avatar
Michael Hanselmann committed
1122

1123
  @locking.ssynchronized(_LOCK)
1124
  @_RequireOpenQueue
1125
1126
1127
1128
1129
1130
1131
1132
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
1133
    assert node_name != self._my_hostname
1134

1135
    # Clean queue directory on added node
1136
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1137
    msg = result.fail_msg
1138
1139
1140
    if msg:
      logging.warning("Cannot cleanup queue directory on node %s: %s",
                      node_name, msg)
1141

1142
1143
1144
1145
1146
1147
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

1148
1149
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1150

1151
1152
1153
1154
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
1155
      # Read file content
1156
      content = utils.ReadFile(file_name)
1157

1158
1159
1160
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
1161
      msg = result[node_name].fail_msg
1162
1163
1164
      if msg:
        logging.error("Failed to upload file %s to node %s: %s",
                      file_name, node_name, msg)
1165

1166
    self._nodes[node_name] = node.primary_ip
1167

1168
  @locking.ssynchronized(_LOCK)
1169
1170
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
1171
1172
1173
1174
1175
1176
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
1177
    self._nodes.pop(node_name, None)
1178

1179
1180
  @staticmethod
  def _CheckRpcResult(result, nodes, failmsg):
1181
1182
1183
1184
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
1185
    log the case when more than half of the nodes fails.
1186
1187
1188
1189
1190
1191
1192
1193

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
1194
1195
1196
1197
    failed = []
    success = []

    for node in nodes:
1198
      msg = result[node].fail_msg
1199
      if msg:
1200
        failed.append(node)
1201
1202
        logging.error("RPC call %s (%s) failed on node %s: %s",
                      result[node].call, failmsg, node, msg)
1203
1204
      else:
        success.append(node)
1205
1206
1207
1208
1209
1210

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

1211
1212
1213
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

1214
1215
1216
1217
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

1218
    """
1219
    # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1220
1221
1222
1223
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

1224
  def _UpdateJobQueueFile(self, file_name, data, replicate):
1225
1226
    """Writes a file locally and then replicates it to all nodes.

1227
1228
1229
1230
1231
1232
1233
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file
1234
1235
    @type replicate: boolean
    @param replicate: whether to spread the changes to the remote nodes
1236

1237
    """
1238
1239
1240
    getents = runtime.GetEnts()
    utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
                    gid=getents.masterd_gid)
1241

1242
1243
1244
1245
    if replicate:
      names, addrs = self._GetNodeIp()
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1246

1247
  def _RenameFilesUnlocked(self, rename):
1248
1249
1250
1251
1252
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

1253
1254
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
1255
1256

    """
1257
    # Rename them locally
1258
1259
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
1260

1261
1262
1263
1264
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1265

1266
1267
  @staticmethod
  def _FormatJobID(job_id):
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1280
1281
1282
1283
1284
1285
1286
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

Iustin Pop's avatar
Iustin Pop committed
1299
  def _NewSerialsUnlocked(self, count):
1300
1301
1302
1303
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

Iustin Pop's avatar
Iustin Pop committed
1304
1305
    @type count: integer
    @param count: how many serials to return
1306
1307
    @rtype: str
    @return: a string representing the job identifier.
1308
1309

    """
Iustin Pop's avatar
Iustin Pop committed
1310
    assert count > 0
1311
    # New number
Iustin Pop's avatar
Iustin Pop committed
1312
    serial = self._last_serial + count
1313
1314

    # Write to file
1315
1316
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
                             "%s\n" % serial, True)
1317

Iustin Pop's avatar
Iustin Pop committed
1318
1319
    result = [self._FormatJobID(v)
              for v in range(self._last_serial, serial + 1)]
1320
1321
1322
    # Keep it only if we were able to write the file
    self._last_serial = serial

Iustin Pop's avatar
Iustin Pop committed
1323
    return result
1324

Michael Hanselmann's avatar
Michael Hanselmann committed
1325
1326
  @staticmethod
  def _GetJobPath(job_id):
1327
1328
1329
1330
1331
1332
1333
1334
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
1335
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
1336

1337
1338
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
1339
1340
1341
1342