jqueue.py 33.6 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50
JOBQUEUE_THREADS = 25
Michael Hanselmann's avatar
Michael Hanselmann committed
51

Iustin Pop's avatar
Iustin Pop committed
52

53
def TimeStampNow():
54
55
56
57
58
59
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
60
61
62
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
63
64
65
class _QueuedOpCode(object):
  """Encasulates an opcode object.

66
67
68
69
70
71
72
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
73

Michael Hanselmann's avatar
Michael Hanselmann committed
74
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
75
  def __init__(self, op):
76
77
78
79
80
81
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
82
83
84
85
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
86
87
    self.start_timestamp = None
    self.end_timestamp = None
88
89
90

  @classmethod
  def Restore(cls, state):
91
92
93
94
95
96
97
98
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
99
100
101
102
103
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
104
105
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
106
107
108
    return obj

  def Serialize(self):
109
110
111
112
113
114
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
115
116
117
118
119
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
120
121
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
122
      }
123

Michael Hanselmann's avatar
Michael Hanselmann committed
124
125
126
127

class _QueuedJob(object):
  """In-memory job representation.

128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
145
146

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
147
  def __init__(self, queue, job_id, ops):
148
149
150
151
152
153
154
155
156
157
158
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
159
    if not ops:
160
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
161
162
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
163
    self.queue = queue
164
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
165
166
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
167
    self.log_serial = 0
168
169
170
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
171
172
173

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
174
175

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
176
  def Restore(cls, queue, state):
177
178
179
180
181
182
183
184
185
186
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
187
188
189
190
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
191
192
193
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
194
195
196
197
198
199
200
201
202
203
204
205

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

206
207
208
    return obj

  def Serialize(self):
209
210
211
212
213
214
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
215
216
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
217
      "ops": [op.Serialize() for op in self.ops],
218
      "run_op_index": self.run_op_index,
219
220
221
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
222
223
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
224
  def CalcStatus(self):
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
245
246
247
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
248
249
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
250
251
252
253
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
254
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
255
        pass
Iustin Pop's avatar
Iustin Pop committed
256
257
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
258
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
259
        status = constants.JOB_STATUS_RUNNING
Michael Hanselmann's avatar
Michael Hanselmann committed
260
      elif op.status == constants.OP_STATUS_ERROR:
261
262
263
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
264
      elif op.status == constants.OP_STATUS_CANCELED:
265
266
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
267
268
269
270
271
272

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

273
  def GetLogEntries(self, newer_than):
274
275
276
277
278
279
280
281
282
283
    """Selectively returns the log entries.

    @type newer_than: None or int
    @param newer_than: if this is None, return all log enties,
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
284
285
286
287
288
289
290
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
291
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
292
293
294

    return entries

295

Michael Hanselmann's avatar
Michael Hanselmann committed
296
class _JobQueueWorker(workerpool.BaseWorker):
297
298
299
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
318
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
319
320
    """Job executor.

321
322
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
323

324
325
326
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
327
328
329
    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
330
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
331
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
332
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
333
334
335
336
337
338
339
340
341
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
342
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
343
              op.result = None
344
              op.start_timestamp = TimeStampNow()
345
346
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
347
348
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
349
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
350
351
352
            finally:
              queue.release()

353
            def _Log(*args):
354
355
356
357
358
359
360
361
362
363
364
365
366
367
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
368

369
              queue.acquire()
370
              try:
371
372
373
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

374
375
                job.change.notifyAll()
              finally:
376
                queue.release()
377

378
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
379
380
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
381
382
383
384
385

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
386
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
387
388
389
390
391
392
393
394
395
396
397
398
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
399
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
400
401
402
403
404
405
406
407
408
409
410
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
411
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
412
413
      queue.acquire()
      try:
414
415
        try:
          job.run_op_idx = -1
416
          job.end_timestamp = TimeStampNow()
417
418
419
420
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
421
422
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
423
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
424
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
425
426
427


class _JobQueueWorkerPool(workerpool.WorkerPool):
428
429
430
  """Simple class implementing a job-processing workerpool.

  """
431
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
432
433
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
434
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
435
436


Michael Hanselmann's avatar
Michael Hanselmann committed
437
class JobQueue(object):
438
439
440
441
442
  """Quue used to manaage the jobs.

  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
443
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
444

445
446
447
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

448
449
    This function should be used for all 'public' functions. That is,
    functions usually called from other classes.
450

451
    @warning: Use this decorator only after utils.LockedMethod!
452

453
    Example::
454
455
456
457
458
459
460
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
461
      assert self._queue_lock is not None, "Queue should be open"
462
463
464
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
465
  def __init__(self, context):
466
467
468
469
470
471
472
473
474
475
476
477
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
478
    self.context = context
479
    self._memcache = weakref.WeakValueDictionary()
480
    self._my_hostname = utils.HostInfo().name
481

Michael Hanselmann's avatar
Michael Hanselmann committed
482
483
484
485
486
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

487
    # Initialize
488
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
489

490
491
492
493
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
494

495
    # Get initial list of nodes
496
497
    self._nodes = dict((n.name, n.primary_ip)
                       for n in self.context.cfg.GetAllNodesInfo().values())
498
499
500

    # Remove master node
    try:
501
      del self._nodes[self._my_hostname]
502
503
    except ValueError:
      pass
504
505
506

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
507
    # Setup worker pool
508
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
509
510
511
512
513
514

    # We need to lock here because WorkerPool.AddTask() may start a job while
    # we're still doing our work.
    self.acquire()
    try:
      for job in self._GetJobsUnlocked(None):
515
516
517
518
        # a failure in loading the job can cause 'None' to be returned
        if job is None:
          continue

Michael Hanselmann's avatar
Michael Hanselmann committed
519
520
521
522
523
        status = job.CalcStatus()

        if status in (constants.JOB_STATUS_QUEUED, ):
          self._wpool.AddTask(job)

Iustin Pop's avatar
Iustin Pop committed
524
525
        elif status in (constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
Michael Hanselmann's avatar
Michael Hanselmann committed
526
527
528
529
530
531
532
533
534
535
          logging.warning("Unfinished job %s found: %s", job.id, job)
          try:
            for op in job.ops:
              op.status = constants.OP_STATUS_ERROR
              op.result = "Unclean master daemon shutdown"
          finally:
            self.UpdateJobUnlocked(job)
    finally:
      self.release()

536
537
  @utils.LockedMethod
  @_RequireOpenQueue
538
539
540
541
542
543
544
545
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
546
    assert node_name != self._my_hostname
547

548
    # Clean queue directory on added node
549
    rpc.RpcRunner.call_jobqueue_purge(node_name)
550

551
552
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
553

554
555
556
557
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
558
559
560
561
562
563
564
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

565
566
567
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
568
569
570
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

571
    self._nodes[node_name] = node.primary_ip
572
573
574
575

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
576
577
578
579
580
581
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
582
    try:
583
      # The queue is removed by the "leave node" RPC call.
584
      del self._nodes[node_name]
585
    except KeyError:
586
587
      pass

588
  def _CheckRpcResult(self, result, nodes, failmsg):
589
590
591
592
593
594
595
596
597
598
599
600
601
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
    log the case when more than half of the nodes failes.

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

619
620
621
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

622
623
624
625
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

626
627
628
629
630
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

631
632
633
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

634
635
636
637
638
639
640
641
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

642
643
644
    """
    utils.WriteFile(file_name, data=data)

645
    names, addrs = self._GetNodeIp()
646
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
647
648
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
649

650
  def _RenameFileUnlocked(self, old, new):
651
652
653
654
655
656
657
658
659
660
661
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

    @type old: str
    @param old: the current name of the file
    @type new: str
    @param new: the new name of the file

    """
662
663
    os.rename(old, new)

664
    names, addrs = self._GetNodeIp()
665
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
666
667
    self._CheckRpcResult(result, self._nodes,
                         "Moving %s to %s" % (old, new))
668

Michael Hanselmann's avatar
Michael Hanselmann committed
669
  def _FormatJobID(self, job_id):
670
671
672
673
674
675
676
677
678
679
680
681
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
682
683
684
685
686
687
688
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

689
  def _NewSerialUnlocked(self):
690
691
692
693
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

694
695
    @rtype: str
    @return: a string representing the job identifier.
696
697
698
699
700
701

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
702
703
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
704
705
706
707

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
708
    return self._FormatJobID(serial)
709

Michael Hanselmann's avatar
Michael Hanselmann committed
710
711
  @staticmethod
  def _GetJobPath(job_id):
712
713
714
715
716
717
718
719
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
720
721
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
722
723
  @staticmethod
  def _GetArchivedJobPath(job_id):
724
725
726
727
728
729
730
731
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
732
733
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
734
735
  @classmethod
  def _ExtractJobID(cls, name):
736
737
738
739
740
741
742
743
744
745
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
746
    m = cls._RE_JOB_FILE.match(name)
747
748
749
750
751
    if m:
      return m.group(1)
    else:
      return None

752
753
754
755
756
757
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
758
759
760
761
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

762
763
764
    @rtype: list
    @return: the list of job IDs

765
    """
766
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
767
    jlist = utils.NiceSort(jlist)
768
    return jlist
769

770
  def _ListJobFiles(self):
771
772
773
774
775
776
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
777
778
779
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

780
  def _LoadJobUnlocked(self, job_id):
781
782
783
784
785
786
787
788
789
790
791
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
792
793
    job = self._memcache.get(job_id, None)
    if job:
794
      logging.debug("Found job %s in memcache", job_id)
795
      return job
Iustin Pop's avatar
Iustin Pop committed
796

797
    filepath = self._GetJobPath(job_id)
798
799
800
801
802
803
804
805
806
807
808
809
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

810
811
812
813
814
815
816
817
818
819
820
821
822
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
        self._RenameFileUnlocked(filepath, new_path)
      return None

Iustin Pop's avatar
Iustin Pop committed
823
    self._memcache[job_id] = job
824
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
825
    return job
826
827

  def _GetJobsUnlocked(self, job_ids):
828
829
830
831
832
833
834
835
836
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
837
838
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
839

840
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
841

842
843
844
845
846
847
848
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

849
850
851
    @rtype: boolean
    @return: True of the job queue is marked for draining

852
853
854
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

855
856
857
858
859
860
861
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

862
863
864
    @type drain_flag: boolean
    @param drain_flag: wheter to set or unset the drain flag

865
866
867
868
869
870
871
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

872
  @utils.LockedMethod
873
  @_RequireOpenQueue
874
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
875
    """Create and store a new job.
876

Michael Hanselmann's avatar
Michael Hanselmann committed
877
878
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
879
880

    @type ops: list
881
    @param ops: The list of OpCodes that will become the new job.
882
883
884
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
885
886

    """
887
888
    if self._IsQueueMarkedDrain():
      raise errors.JobQueueDrainError()
889
    # Get job identifier
890
    job_id = self._NewSerialUnlocked()
891
892
893
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
894
    self.UpdateJobUnlocked(job)
895

896
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
897
898
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
899
900
901
902
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
903

904
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
905
  def UpdateJobUnlocked(self, job):
906
907
908
909
910
911
912
913
914
915
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
916
    filename = self._GetJobPath(job.id)
917
    data = serializer.DumpJson(job.Serialize(), indent=False)
918
    logging.debug("Writing job %s to %s", job.id, filename)
919
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
920

921
    # Notify waiters about potential changes
922
    job.change.notifyAll()
923

924
  @utils.LockedMethod
925
  @_RequireOpenQueue
926
927
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
928
929
930
931
932
933
934
935
936
937
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
938
939
    @type timeout: float
    @param timeout: maximum time to wait
940
941
942
943
944
945
946
947
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
948
949

    """
950
    logging.debug("Waiting for changes in job %s", job_id)
951
    end_time = time.time() + timeout
952
    while True:
953
954
955
956
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

957
958
959
960
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
961

962
963
964
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
965
966
967
968
969
970

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
971
972
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
973

974
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
975
976
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
977
978
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
979
980
        break

981
982
983
984
985
986
987
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
988
      job.change.wait(delta_time)
989
990
991

    logging.debug("Job %s changed", job_id)

992
    return (job_info, log_entries)
993

994
  @utils.LockedMethod
995
  @_RequireOpenQueue
996
997
998
  def CancelJob(self, job_id):
    """Cancels a job.

999
1000
    This will only succeed if the job has not started yet.

1001
    @type job_id: string
1002
    @param job_id: job ID of job to be cancelled.
1003
1004
1005
1006

    """
    logging.debug("Cancelling job %s", job_id)

Michael Hanselmann's avatar
Michael Hanselmann committed
1007
    job = self._LoadJobUnlocked(job_id)
1008
1009
1010
1011
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
1012
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1013
1014
1015
      logging.debug("Job %s is no longer in the queue", job.id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
1016
1017
1018
1019
1020
1021
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
        op.result = "Job cancelled by request"
    finally:
      self.UpdateJobUnlocked(job)
1022

1023
  @_RequireOpenQueue
Iustin Pop's avatar
Iustin Pop committed
1024
  def _ArchiveJobUnlocked(self, job_id):
1025
1026
1027
1028
1029
1030
    """Archives a job.

    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
Iustin Pop's avatar
Iustin Pop committed
1031
    logging.info("Archiving job %s", job_id)
1032
1033
1034
1035
1036
1037

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return

Michael Hanselmann's avatar
Michael Hanselmann committed
1038
1039
1040
1041
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                constants.JOB_STATUS_SUCCESS,
                                constants.JOB_STATUS_ERROR):
      logging.debug("Job %s is not yet done", job.id)
1042
1043
      return

1044
1045
    old = self._GetJobPath(job.id)
    new = self._GetArchivedJobPath(job.id)
1046

1047
    self._RenameFileUnlocked(old, new)
1048

1049
    logging.debug("Successfully archived job %s", job.id)
1050

Iustin Pop's avatar
Iustin Pop committed
1051
1052
1053
1054
1055
  @utils.LockedMethod
  @_RequireOpenQueue
  def ArchiveJob(self, job_id):
    """Archives a job.

1056
1057
    This is just a wrapper over L{_ArchiveJobUnlocked}.

Iustin Pop's avatar
Iustin Pop committed
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
    @type job_id: string
    @param job_id: Job ID of job to be archived.

    """
    return self._ArchiveJobUnlocked(job_id)

  @utils.LockedMethod
  @_RequireOpenQueue
  def AutoArchiveJobs(self, age):
    """Archives all jobs based on age.

    The method will archive all jobs which are older than the age
    parameter. For jobs that don't have an end timestamp, the start
    timestamp will be considered. The special '-1' age will cause
    archival of all jobs (that are not running or queued).

    @type age: int
    @param age: the minimum age in seconds

    """
    logging.info("Archiving jobs with age more than %s seconds", age)

    now = time.time()
    for jid in self._GetJobIDsUnlocked(archived=False):
      job = self._LoadJobUnlocked(jid)
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
                                  constants.OP_STATUS_ERROR,
                                  constants.OP_STATUS_CANCELED):
        continue
      if job.end_timestamp is None:
        if job.start_timestamp is None:
          job_age = job.received_timestamp
        else:
          job_age = job.start_timestamp
      else:
        job_age = job.end_timestamp

      if age == -1 or now - job_age[0] > age:
        self._ArchiveJobUnlocked(jid)

Michael Hanselmann's avatar
Michael Hanselmann committed
1098
  def _GetJobInfoUnlocked(self, job, fields):
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
    """Returns information about a job.

    @type job: L{_QueuedJob}
    @param job: the job which we query
    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list with one element for each field
    @raise errors.OpExecError: when an invalid field
        has been passed

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1111
1112
1113
1114
1115
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
Michael Hanselmann's avatar
Michael Hanselmann committed
1116
        row.append(job.CalcStatus())
1117
      elif fname == "ops":
Michael Hanselmann's avatar
Michael Hanselmann committed
1118
        row.append([op.input.__getstate__() for op in job.ops])
1119
      elif fname == "opresult":
Michael Hanselmann's avatar
Michael Hanselmann committed
1120
        row.append([op.result for op in job.ops])
1121
      elif fname == "opstatus":
Michael Hanselmann's avatar
Michael Hanselmann committed
1122
        row.append([op.status for op in job.ops])
1123
1124
      elif fname == "oplog":
        row.append([op.log for op in job.ops])
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
      elif fname == "opstart":
        row.append([op.start_timestamp for op in job.ops])
      elif fname == "opend":
        row.append([op.end_timestamp for op in job.ops])
      elif fname == "received_ts":
        row.append(job.received_timestamp)
      elif fname == "start_ts":
        row.append(job.start_timestamp)
      elif fname == "end_ts":
        row.append(job.end_timestamp)
1135
1136
      elif fname == "summary":
        row.append([op.input.Summary() for op in job.ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
1137
1138
1139
1140
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

Michael Hanselmann's avatar
Michael Hanselmann committed
1141
  @utils.LockedMethod
1142
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
1143
1144
1145
  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
    processing for each job.

    @type job_ids: list
    @param job_ids: sequence of job identifiers or None for all
    @type fields: list
    @param fields: names of fields to return
    @rtype: list
    @return: list one element per job, each element being list with
        the requested fields
Michael Hanselmann's avatar
Michael Hanselmann committed
1156
1157

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1158
    jobs = []
Michael Hanselmann's avatar
Michael Hanselmann committed
1159

Michael Hanselmann's avatar
Michael Hanselmann committed
1160
1161
1162
1163
1164
    for job in self._GetJobsUnlocked(job_ids):
      if job is None:
        jobs.append(None)
      else:
        jobs.append(self._GetJobInfoUnlocked(job, fields))
Michael Hanselmann's avatar
Michael Hanselmann committed
1165

Michael Hanselmann's avatar
Michael Hanselmann committed
1166
    return jobs
Michael Hanselmann's avatar
Michael Hanselmann committed
1167

1168
  @utils.LockedMethod
1169
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
1170
1171
1172
  def Shutdown(self):
    """Stops the job queue.

1173
1174
    This shutdowns all the worker threads an closes the queue.

Michael Hanselmann's avatar
Michael Hanselmann committed
1175
1176
    """
    self._wpool.TerminateWorkers()
Michael Hanselmann's avatar
Michael Hanselmann committed
1177

1178
1179
    self._queue_lock.Close()
    self._queue_lock = None