jqueue.py 36 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
Michael Hanselmann's avatar
Michael Hanselmann committed
52

Iustin Pop's avatar
Iustin Pop committed
53

54
class CancelJob(Exception):
55
56
57
58
59
  """Special exception to cancel a job.

  """


60
def TimeStampNow():
61
62
63
64
65
66
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
67
68
69
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
70
71
72
class _QueuedOpCode(object):
  """Encasulates an opcode object.

73
74
75
76
77
78
79
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
80

Michael Hanselmann's avatar
Michael Hanselmann committed
81
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
82
  def __init__(self, op):
83
84
85
86
87
88
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
89
90
91
92
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
93
94
    self.start_timestamp = None
    self.end_timestamp = None
95
96
97

  @classmethod
  def Restore(cls, state):
98
99
100
101
102
103
104
105
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
106
107
108
109
110
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
111
112
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
113
114
115
    return obj

  def Serialize(self):
116
117
118
119
120
121
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
122
123
124
125
126
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
127
128
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
129
      }
130

Michael Hanselmann's avatar
Michael Hanselmann committed
131
132
133
134

class _QueuedJob(object):
  """In-memory job representation.

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
152
153

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
154
  def __init__(self, queue, job_id, ops):
155
156
157
158
159
160
161
162
163
164
165
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
166
    if not ops:
167
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
168
169
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
170
    self.queue = queue
171
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
172
173
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
174
    self.log_serial = 0
175
176
177
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
178
179
180

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
181
182

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
183
  def Restore(cls, queue, state):
184
185
186
187
188
189
190
191
192
193
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
194
195
196
197
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
198
199
200
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
201
202
203
204
205
206
207
208
209
210
211
212

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

213
214
215
    return obj

  def Serialize(self):
216
217
218
219
220
221
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
222
223
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
224
      "ops": [op.Serialize() for op in self.ops],
225
      "run_op_index": self.run_op_index,
226
227
228
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
229
230
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
231
  def CalcStatus(self):
232
233
234
235
236
237
238
239
240
241
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
242
          - canceling
243
244
245
246
247
248
249
250
251
252
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
253
254
255
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
256
257
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
258
259
260
261
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
262
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
263
        pass
Iustin Pop's avatar
Iustin Pop committed
264
265
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
266
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
267
        status = constants.JOB_STATUS_RUNNING
268
269
270
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
271
      elif op.status == constants.OP_STATUS_ERROR:
272
273
274
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
275
      elif op.status == constants.OP_STATUS_CANCELED:
276
277
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
278
279
280
281
282
283

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

284
  def GetLogEntries(self, newer_than):
285
286
287
288
289
290
291
292
293
294
    """Selectively returns the log entries.

    @type newer_than: None or int
    @param newer_than: if this is None, return all log enties,
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
295
296
297
298
299
300
301
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
302
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303
304
305

    return entries

306

Michael Hanselmann's avatar
Michael Hanselmann committed
307
class _JobQueueWorker(workerpool.BaseWorker):
308
309
310
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
311
312
313
314
315
316
317
318
319
320
321
322
323
324
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
325
326
327
328
329
330
331
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
                                    constants.OP_STATUS_CANCELING)

      # Cancel here if we were asked to
      if self.opcode.status == constants.OP_STATUS_CANCELING:
        raise CancelJob()

Iustin Pop's avatar
Iustin Pop committed
332
333
334
335
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
336
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
337
338
    """Job executor.

339
340
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
341

342
343
344
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
345
346
347
    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
348
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
349
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
350
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
351
352
353
354
355
356
357
358
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
359
              assert op.status == constants.OP_STATUS_QUEUED
Michael Hanselmann's avatar
Michael Hanselmann committed
360
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
361
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
362
              op.result = None
363
              op.start_timestamp = TimeStampNow()
364
365
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
366
367
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
368
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
369
370
371
            finally:
              queue.release()

372
            def _Log(*args):
373
374
375
376
377
378
379
380
381
382
383
384
385
386
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
387

388
              queue.acquire()
389
              try:
390
391
392
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

393
394
                job.change.notifyAll()
              finally:
395
                queue.release()
396

397
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
398
399
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
400
401
402
403
404

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
405
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
406
407
408
409
410
411
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
412
413
414
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
415
416
417
418
419
420
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
421
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
422
423
424
425
426
427
428
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

429
430
431
432
433
434
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
435
436
437
438
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
439
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
440
441
      queue.acquire()
      try:
442
443
        try:
          job.run_op_idx = -1
444
          job.end_timestamp = TimeStampNow()
445
446
447
448
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
449
450
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
451
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
452
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
453
454
455


class _JobQueueWorkerPool(workerpool.WorkerPool):
456
457
458
  """Simple class implementing a job-processing workerpool.

  """
459
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
460
461
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
462
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
463
464


Michael Hanselmann's avatar
Michael Hanselmann committed
465
class JobQueue(object):
466
467
468
469
470
  """Quue used to manaage the jobs.

  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
471
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472

473
474
475
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

476
477
    This function should be used for all 'public' functions. That is,
    functions usually called from other classes.
478

479
    @warning: Use this decorator only after utils.LockedMethod!
480

481
    Example::
482
483
484
485
486
487
488
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
489
      assert self._queue_lock is not None, "Queue should be open"
490
491
492
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
493
  def __init__(self, context):
494
495
496
497
498
499
500
501
502
503
504
505
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
506
    self.context = context
507
    self._memcache = weakref.WeakValueDictionary()
508
    self._my_hostname = utils.HostInfo().name
509

Michael Hanselmann's avatar
Michael Hanselmann committed
510
511
512
513
514
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

515
    # Initialize
516
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517

518
519
520
521
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
522

523
    # Get initial list of nodes
524
    self._nodes = dict((n.name, n.primary_ip)
525
526
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
527
528
529

    # Remove master node
    try:
530
      del self._nodes[self._my_hostname]
531
    except KeyError:
532
      pass
533
534
535

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
536
    # Setup worker pool
537
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
538
    try:
539
540
541
542
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
543
544
545
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
546
        jobs_count = len(all_job_ids)
547
548
549
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
550
551
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
552
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
553
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
554
555
556
557
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

558
559
560
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
561

562
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
563

564
565
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
566

567
          elif status in (constants.JOB_STATUS_RUNNING,
568
569
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
570
571
572
573
574
575
576
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
              for op in job.ops:
                op.status = constants.OP_STATUS_ERROR
                op.result = "Unclean master daemon shutdown"
            finally:
              self.UpdateJobUnlocked(job)
577
578

        logging.info("Job queue inspection finished")
579
580
581
582
583
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
584

585
586
  @utils.LockedMethod
  @_RequireOpenQueue
587
588
589
590
591
592
593
594
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
595
    assert node_name != self._my_hostname
596

597
    # Clean queue directory on added node
598
    rpc.RpcRunner.call_jobqueue_purge(node_name)
599

600
601
602
603
604
605
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

606
607
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
608

609
610
611
612
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
613
614
615
616
617
618
619
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

620
621
622
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
623
624
625
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

626
    self._nodes[node_name] = node.primary_ip
627
628
629
630

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
631
632
633
634
635
636
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
637
    try:
638
      # The queue is removed by the "leave node" RPC call.
639
      del self._nodes[node_name]
640
    except KeyError:
641
642
      pass

643
  def _CheckRpcResult(self, result, nodes, failmsg):
644
645
646
647
648
649
650
651
652
653
654
655
656
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
    log the case when more than half of the nodes failes.

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

674
675
676
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

677
678
679
680
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

681
682
683
684
685
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

686
687
688
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

689
690
691
692
693
694
695
696
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

697
698
699
    """
    utils.WriteFile(file_name, data=data)

700
    names, addrs = self._GetNodeIp()
701
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
702
703
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
704

705
  def _RenameFileUnlocked(self, old, new):
706
707
708
709
710
711
712
713
714
715
716
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

    @type old: str
    @param old: the current name of the file
    @type new: str
    @param new: the new name of the file

    """
717
718
    os.rename(old, new)

719
    names, addrs = self._GetNodeIp()
720
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721
722
    self._CheckRpcResult(result, self._nodes,
                         "Moving %s to %s" % (old, new))
723

Michael Hanselmann's avatar
Michael Hanselmann committed
724
  def _FormatJobID(self, job_id):
725
726
727
728
729
730
731
732
733
734
735
736
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
737
738
739
740
741
742
743
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

744
  def _NewSerialUnlocked(self):
745
746
747
748
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

749
750
    @rtype: str
    @return: a string representing the job identifier.
751
752
753
754
755
756

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
757
758
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
759
760
761
762

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
763
    return self._FormatJobID(serial)
764

Michael Hanselmann's avatar
Michael Hanselmann committed
765
766
  @staticmethod
  def _GetJobPath(job_id):
767
768
769
770
771
772
773
774
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
775
776
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
777
778
  @staticmethod
  def _GetArchivedJobPath(job_id):
779
780
781
782
783
784
785
786
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
787
788
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
789
790
  @classmethod
  def _ExtractJobID(cls, name):
791
792
793
794
795
796
797
798
799
800
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
801
    m = cls._RE_JOB_FILE.match(name)
802
803
804
805
806
    if m:
      return m.group(1)
    else:
      return None

807
808
809
810
811
812
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
813
814
815
816
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

817
818
819
    @rtype: list
    @return: the list of job IDs

820
    """
821
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
822
    jlist = utils.NiceSort(jlist)
823
    return jlist
824

825
  def _ListJobFiles(self):
826
827
828
829
830
831
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
832
833
834
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

835
  def _LoadJobUnlocked(self, job_id):
836
837
838
839
840
841
842
843
844
845
846
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
847
848
    job = self._memcache.get(job_id, None)
    if job:
849
      logging.debug("Found job %s in memcache", job_id)
850
      return job
Iustin Pop's avatar
Iustin Pop committed
851

852
    filepath = self._GetJobPath(job_id)
853
854
855
856
857
858
859
860
861
862
863
864
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

865
866
867
868
869
870
871
872
873
874
875
876
877
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
        self._RenameFileUnlocked(filepath, new_path)
      return None

Iustin Pop's avatar
Iustin Pop committed
878
    self._memcache[job_id] = job
879
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
880
    return job
881
882

  def _GetJobsUnlocked(self, job_ids):
883
884
885
886
887
888
889
890
891
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
892
893
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
894

895
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
896

897
898
899
900
901
902
903
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

904
905
906
    @rtype: boolean
    @return: True of the job queue is marked for draining

907
908
909
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

910
911
912
913
914
915
916
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

917
918
919
    @type drain_flag: boolean
    @param drain_flag: wheter to set or unset the drain flag

920
921
922
923
924
925
926
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

927
  @utils.LockedMethod
928
  @_RequireOpenQueue
929
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
930
    """Create and store a new job.
931

Michael Hanselmann's avatar
Michael Hanselmann committed
932
933
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
934
935

    @type ops: list
936
    @param ops: The list of OpCodes that will become the new job.
937
938
939
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
940
941

    """
942
943
    if self._IsQueueMarkedDrain():
      raise errors.JobQueueDrainError()
944
    # Get job identifier
945
    job_id = self._NewSerialUnlocked()
946
947
948
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
949
    self.UpdateJobUnlocked(job)
950

951
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
952
953
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
954
955
956
957
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
958

959
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
960
  def UpdateJobUnlocked(self, job):
961
962
963
964
965
966
967
968
969
970
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
971
    filename = self._GetJobPath(job.id)
972
    data = serializer.DumpJson(job.Serialize(), indent=False)
973
    logging.debug("Writing job %s to %s", job.id, filename)
974
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
975

976
    # Notify waiters about potential changes
977
    job.change.notifyAll()
978

979
  @utils.LockedMethod
980
  @_RequireOpenQueue
981
982
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
983
984
985
986
987
988
989
990
991
992
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
993
994
    @type timeout: float
    @param timeout: maximum time to wait
995
996
997
998
999
1000
1001
1002
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1003
1004

    """
1005
    logging.debug("Waiting for changes in job %s", job_id)
1006
    end_time = time.time() + timeout
1007
    while True:
1008
1009
1010
1011
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

1012
1013
1014
1015
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
1016

1017
1018
1019
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
1020
1021
1022
1023
1024
1025

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
1026
1027
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1028

1029
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
1030
1031
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
1032
1033
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
1034
1035
        break

1036
1037
1038
1039
1040
1041
1042
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
1043
      job.change.wait(delta_time)
1044
1045
1046

    logging.debug("Job %s changed", job_id)

1047
    return (job_info, log_entries)
1048

1049
  @utils.LockedMethod
1050
  @_RequireOpenQueue
1051
1052
1053
  def CancelJob(self, job_id):
    """Cancels a job.

1054
1055
    This will only succeed if the job has not started yet.

1056
    @type job_id: string
1057
    @param job_id: job ID of job to be cancelled.
1058
1059

    """
1060
    logging.info("Cancelling job %s", job_id)
1061

Michael Hanselmann's avatar
Michael Hanselmann committed
1062
    job = self._LoadJobUnlocked(job_id)
1063
1064
    if not job:
      logging.debug("Job %s not found", job_id)
1065
1066
1067
      return (False, "Job %s not found" % job_id)

    job_status = job.CalcStatus()
1068

1069
1070
    if job_status not in (constants.JOB_STATUS_QUEUED,
                          constants.JOB_STATUS_WAITLOCK):
1071
      logging.debug("Job %s is no longer in the queue", job.id)
1072
1073
1074
1075
1076
      return (False, "Job %s is no longer in the queue" % job.id)

    if job_status == constants.JOB_STATUS_QUEUED:
      self.CancelJobUnlocked(job)
      return (True, "Job %s canceled" % job.id)
1077

1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
    elif job_status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      try:
        for op in job.ops:
          op.status = constants.OP_STATUS_CANCELING
      finally:
        self.UpdateJobUnlocked(job)
      return (True, "Job %s will be canceled" % job.id)

  @_RequireOpenQueue
  def CancelJobUnlocked(self, job):
    """Marks a job as canceled.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1092
1093
1094
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
1095
        op.result = "Job canceled by request"
Michael Hanselmann's avatar
Michael Hanselmann committed
1096
1097
    finally:
      self.UpdateJobUnlocked(job)
1098

1099
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
1100
  def _ArchiveJobUnlocked(self, job_id):
1101
1102
1103
    """Archives a job.

    @type job_id: string
Iustin Pop's avatar
Iustin Pop committed
1104
    @param job_id: the ID of job to be archived
1105
1106

    """
Iustin Pop's avatar
Iustin Pop committed
1107
    logging.info("Archiving job %s", job_id)
1108
1109
1110
1111
1112
1113

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
1114
1115
1116
1117
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
1118
1119
      return

1120
1121
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
1122

1123
    self._RenameFileUnlocked(old, new)
1124

1125
    logging.debug("Successfully archived job %s", job.id)
1126

Iustin Pop's avatar
Iustin Pop committed
1127
1128
1129
1130
1131
  @utils.LockedMethod
  @_RequireOpenQueue
  def ArchiveJob(self, job_id):
    """Archives a job.

1132
1133
    This is just a wrapper over L{_ArchiveJobUnlocked}.

Iustin Pop's avatar
Iustin Pop committed
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    return self._ArchiveJobUnlocked(job_id)

  @utils.LockedMethod
  @_RequireOpenQueue
  def AutoArchiveJobs(self, age):
    """Archives all jobs based on age.

    The method will archive all jobs which are older than the age
    parameter. For jobs that don't have an end timestamp, the start
    timestamp will be considered. The special '-1' age will cause
    archival of all jobs (that are not running or queued).

    @type age: int
    @param age: the minimum age in seconds

    """
    logging.info("Archiving jobs with age more than %s seconds", age)

    now = time.time()
    for jid in self._GetJobIDsUnlocked(archived=False):
      job = self._LoadJobUnlocked(jid)
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
                                  constants.OP_STATUS_ERROR,
                                  constants.OP_STATUS_CANCELED):
        continue
      if job.end_timestamp is None:
        if job.start_timestamp is None:
          job_age = job.received_timestamp
        else:
          job_age = job.start_timestamp
      else:
        job_age = job.end_timestamp

      if age == -1 or now - job_age[0] > age:
        self._ArchiveJobUnlocked(jid)

Michael Hanselmann's avatar
Michael Hanselmann committed
1174
  def _GetJobInfoUnlocked(self, job, fields):
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
    """Returns information about a job.

    @type job: L{_QueuedJob}
    @param job: the job which we query
    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field