jqueue.py 37.7 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56
57
58
59
60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62
63
64
65
66
67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68
69
70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
72
73
class _QueuedOpCode(object):
  """Encasulates an opcode object.

74
75
76
77
78
79
80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
Michael Hanselmann's avatar
Michael Hanselmann committed
83
  def __init__(self, op):
84
85
86
87
88
89
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
90
91
92
93
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
94
95
    self.start_timestamp = None
    self.end_timestamp = None
96
97
98

  @classmethod
  def Restore(cls, state):
99
100
101
102
103
104
105
106
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
107
108
109
110
111
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
112
113
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
114
115
116
    return obj

  def Serialize(self):
117
118
119
120
121
122
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
123
124
125
126
127
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
128
129
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
130
      }
131

Michael Hanselmann's avatar
Michael Hanselmann committed
132
133
134
135

class _QueuedJob(object):
  """In-memory job representation.

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
153
154

  """
Michael Hanselmann's avatar
Michael Hanselmann committed
155
  def __init__(self, queue, job_id, ops):
156
157
158
159
160
161
162
163
164
165
166
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
167
    if not ops:
168
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
169
170
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
171
    self.queue = queue
172
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
173
174
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
175
    self.log_serial = 0
176
177
178
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
179
180
181

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
182
183

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
184
  def Restore(cls, queue, state):
185
186
187
188
189
190
191
192
193
194
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
195
196
197
198
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
199
200
201
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
202
203
204
205
206
207
208
209
210
211
212
213

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

214
215
216
    return obj

  def Serialize(self):
217
218
219
220
221
222
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
223
224
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
225
      "ops": [op.Serialize() for op in self.ops],
226
      "run_op_index": self.run_op_index,
227
228
229
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
230
231
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
232
  def CalcStatus(self):
233
234
235
236
237
238
239
240
241
242
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
243
          - canceling
244
245
246
247
248
249
250
251
252
253
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
254
255
256
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
257
258
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
259
260
261
262
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
263
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
264
        pass
Iustin Pop's avatar
Iustin Pop committed
265
266
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
267
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
268
        status = constants.JOB_STATUS_RUNNING
269
270
271
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
272
      elif op.status == constants.OP_STATUS_ERROR:
273
274
275
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
276
      elif op.status == constants.OP_STATUS_CANCELED:
277
278
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
279
280
281
282
283
284

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

285
  def GetLogEntries(self, newer_than):
286
287
288
289
290
291
292
293
294
295
    """Selectively returns the log entries.

    @type newer_than: None or int
    @param newer_than: if this is None, return all log enties,
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
296
297
298
299
300
301
302
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
303
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
304
305
306

    return entries

307

Michael Hanselmann's avatar
Michael Hanselmann committed
308
class _JobQueueWorker(workerpool.BaseWorker):
309
310
311
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
312
313
314
315
316
317
318
319
320
321
322
323
324
325
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
326
327
328
329
330
331
332
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
                                    constants.OP_STATUS_CANCELING)

      # Cancel here if we were asked to
      if self.opcode.status == constants.OP_STATUS_CANCELING:
        raise CancelJob()

Iustin Pop's avatar
Iustin Pop committed
333
334
335
336
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
337
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
338
339
    """Job executor.

340
341
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
342

343
344
345
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
346
347
348
    """
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
349
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
350
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
351
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
352
353
354
355
356
357
358
359
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
          try:
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)

            queue.acquire()
            try:
360
              assert op.status == constants.OP_STATUS_QUEUED
Michael Hanselmann's avatar
Michael Hanselmann committed
361
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
362
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
363
              op.result = None
364
              op.start_timestamp = TimeStampNow()
365
366
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
367
368
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
369
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
370
371
372
            finally:
              queue.release()

373
            def _Log(*args):
374
375
376
377
378
379
380
381
382
383
384
385
386
387
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
388

389
              queue.acquire()
390
              try:
391
392
393
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

394
395
                job.change.notifyAll()
              finally:
396
                queue.release()
397

398
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
399
400
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
401
402
403
404
405

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
406
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
407
408
409
410
411
412
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

            logging.debug("Op %s/%s: Successfully finished %s",
                          idx + 1, count, op)
413
414
415
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
416
417
418
419
420
421
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
422
                op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
423
424
425
426
427
428
429
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

430
431
432
433
434
435
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
436
437
438
439
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
440
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
441
442
      queue.acquire()
      try:
443
444
        try:
          job.run_op_idx = -1
445
          job.end_timestamp = TimeStampNow()
446
447
448
449
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
450
451
      finally:
        queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
452
      logging.debug("Worker %s finished job %s, status = %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
453
                    self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
454
455
456


class _JobQueueWorkerPool(workerpool.WorkerPool):
457
458
459
  """Simple class implementing a job-processing workerpool.

  """
460
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
461
462
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
463
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
464
465


Michael Hanselmann's avatar
Michael Hanselmann committed
466
class JobQueue(object):
467
468
469
470
471
  """Quue used to manaage the jobs.

  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
472
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
473

474
475
476
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

477
478
    This function should be used for all 'public' functions. That is,
    functions usually called from other classes.
479

480
    @warning: Use this decorator only after utils.LockedMethod!
481

482
    Example::
483
484
485
486
487
488
489
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
490
      assert self._queue_lock is not None, "Queue should be open"
491
492
493
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
494
  def __init__(self, context):
495
496
497
498
499
500
501
502
503
504
505
506
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
507
    self.context = context
508
    self._memcache = weakref.WeakValueDictionary()
509
    self._my_hostname = utils.HostInfo().name
510

Michael Hanselmann's avatar
Michael Hanselmann committed
511
512
513
514
515
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

516
    # Initialize
517
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
518

519
520
521
522
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
523

524
    # Get initial list of nodes
525
    self._nodes = dict((n.name, n.primary_ip)
526
527
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
528
529
530

    # Remove master node
    try:
531
      del self._nodes[self._my_hostname]
532
    except KeyError:
533
      pass
534
535
536

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
537
    # Setup worker pool
538
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
539
    try:
540
541
542
543
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
544
545
546
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
547
        jobs_count = len(all_job_ids)
548
549
550
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
551
552
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
553
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
554
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
555
556
557
558
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

559
560
561
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
562

563
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
564

565
566
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
567

568
          elif status in (constants.JOB_STATUS_RUNNING,
569
570
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
571
572
573
574
575
576
577
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
              for op in job.ops:
                op.status = constants.OP_STATUS_ERROR
                op.result = "Unclean master daemon shutdown"
            finally:
              self.UpdateJobUnlocked(job)
578
579

        logging.info("Job queue inspection finished")
580
581
582
583
584
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
585

586
587
  @utils.LockedMethod
  @_RequireOpenQueue
588
589
590
591
592
593
594
595
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
596
    assert node_name != self._my_hostname
597

598
    # Clean queue directory on added node
599
    rpc.RpcRunner.call_jobqueue_purge(node_name)
600

601
602
603
604
605
606
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

607
608
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
609

610
611
612
613
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
614
615
616
617
618
619
620
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

621
622
623
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
624
625
626
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

627
    self._nodes[node_name] = node.primary_ip
628
629
630
631

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
632
633
634
635
636
637
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
638
    try:
639
      # The queue is removed by the "leave node" RPC call.
640
      del self._nodes[node_name]
641
    except KeyError:
642
643
      pass

644
  def _CheckRpcResult(self, result, nodes, failmsg):
645
646
647
648
649
650
651
652
653
654
655
656
657
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
    log the case when more than half of the nodes failes.

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

675
676
677
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

678
679
680
681
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

682
683
684
685
686
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

687
688
689
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

690
691
692
693
694
695
696
697
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

698
699
700
    """
    utils.WriteFile(file_name, data=data)

701
    names, addrs = self._GetNodeIp()
702
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
703
704
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
705

706
  def _RenameFilesUnlocked(self, rename):
707
708
709
710
711
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

712
713
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
714
715

    """
716
717
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
718

719
720
721
722
      names, addrs = self._GetNodeIp()
      result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
      self._CheckRpcResult(result, self._nodes,
                           "Moving %s to %s" % (old, new))
723

Michael Hanselmann's avatar
Michael Hanselmann committed
724
  def _FormatJobID(self, job_id):
725
726
727
728
729
730
731
732
733
734
735
736
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
737
738
739
740
741
742
743
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

744
745
746
747
748
749
750
751
752
753
754
755
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

756
  def _NewSerialUnlocked(self):
757
758
759
760
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

761
762
    @rtype: str
    @return: a string representing the job identifier.
763
764
765
766
767
768

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
769
770
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
771
772
773
774

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
775
    return self._FormatJobID(serial)
776

Michael Hanselmann's avatar
Michael Hanselmann committed
777
778
  @staticmethod
  def _GetJobPath(job_id):
779
780
781
782
783
784
785
786
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
787
788
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

789
790
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
791
792
793
794
795
796
797
798
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
799
800
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
801

Michael Hanselmann's avatar
Michael Hanselmann committed
802
803
  @classmethod
  def _ExtractJobID(cls, name):
804
805
806
807
808
809
810
811
812
813
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
814
    m = cls._RE_JOB_FILE.match(name)
815
816
817
818
819
    if m:
      return m.group(1)
    else:
      return None

820
821
822
823
824
825
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
826
827
828
829
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

830
831
832
    @rtype: list
    @return: the list of job IDs

833
    """
834
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
835
    jlist = utils.NiceSort(jlist)
836
    return jlist
837

838
  def _ListJobFiles(self):
839
840
841
842
843
844
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
845
846
847
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

848
  def _LoadJobUnlocked(self, job_id):
849
850
851
852
853
854
855
856
857
858
859
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
860
861
    job = self._memcache.get(job_id, None)
    if job:
862
      logging.debug("Found job %s in memcache", job_id)
863
      return job
Iustin Pop's avatar
Iustin Pop committed
864

865
    filepath = self._GetJobPath(job_id)
866
867
868
869
870
871
872
873
874
875
876
877
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

878
879
880
881
882
883
884
885
886
887
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
888
        self._RenameFilesUnlocked([(filepath, new_path)])
889
890
      return None

Iustin Pop's avatar
Iustin Pop committed
891
    self._memcache[job_id] = job
892
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
893
    return job
894
895

  def _GetJobsUnlocked(self, job_ids):
896
897
898
899
900
901
902
903
904
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
905
906
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
907

908
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
909

910
911
912
913
914
915
916
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

917
918
919
    @rtype: boolean
    @return: True of the job queue is marked for draining

920
921
922
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

923
924
925
926
927
928
929
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

930
931
932
    @type drain_flag: boolean
    @param drain_flag: wheter to set or unset the drain flag

933
934
935
936
937
938
939
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

940
  @utils.LockedMethod
941
  @_RequireOpenQueue
942
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
943
    """Create and store a new job.
944

Michael Hanselmann's avatar
Michael Hanselmann committed
945
946
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
947
948

    @type ops: list
949
    @param ops: The list of OpCodes that will become the new job.
950
951
952
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
953
954

    """
955
956
    if self._IsQueueMarkedDrain():
      raise errors.JobQueueDrainError()
Michael Hanselmann's avatar
Michael Hanselmann committed
957
958
959
960
961
962
963
964
965
966
967
968

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

969
    # Get job identifier
970
    job_id = self._NewSerialUnlocked()
971
972
973
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
974
    self.UpdateJobUnlocked(job)
975

976
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
977
978
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
979
980
981
982
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
983

984
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
985
  def UpdateJobUnlocked(self, job):
986
987
988
989
990
991
992
993
994
995
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
996
    filename = self._GetJobPath(job.id)
997
    data = serializer.DumpJson(job.Serialize(), indent=False)
998
    logging.debug("Writing job %s to %s", job.id, filename)
999
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
1000

1001
    # Notify waiters about potential changes
1002
    job.change.notifyAll()
1003

1004
  @utils.LockedMethod
1005
  @_RequireOpenQueue
1006
1007
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
1018
1019
    @type timeout: float
    @param timeout: maximum time to wait
1020
1021
1022
1023
1024
1025
1026
1027
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1028
1029

    """
1030
    logging.debug("Waiting for changes in job %s", job_id)
1031
    end_time = time.time() + timeout
1032
    while True:
1033
1034
1035
1036
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

1037
1038
1039
1040
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
1041

1042
1043
1044
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
1045
1046
1047
1048
1049
1050

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
1051
1052
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1053

1054
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
1055
1056
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
1057
1058
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
1059
1060
        break

1061
1062
1063
1064
1065
1066
1067
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
1068
      job.change.wait(delta_time)
1069
1070
1071

    logging.debug("Job %s changed", job_id)

1072
    return (job_info, log_entries)
1073

1074
  @utils.LockedMethod
1075
  @_RequireOpenQueue
1076
1077
1078
  def CancelJob(self, job_id):
    """Cancels a job.

1079
1080
    This will only succeed if the job has not started yet.

1081
    @type job_id: string
1082
    @param job_id: job ID of job to be cancelled.
1083
1084

    """
1085
    logging.info("Cancelling job %s", job_id)
1086

Michael Hanselmann's avatar
Michael Hanselmann committed
1087
    job = self._LoadJobUnlocked(job_id)
1088
1089
    if not job:
      logging.debug("Job %s not found", job_id)
1090
1091
1092
      return (False, "Job %s not found" % job_id)

    job_status = job.CalcStatus()
1093

1094
1095
    if job_status not in (constants.JOB_STATUS_QUEUED,
                          constants.JOB_STATUS_WAITLOCK):
1096
      logging.debug("Job %s is no longer in the queue", job.id)
1097
1098
1099
1100
1101
      return (False, "Job %s is no longer in the queue" % job.id)

    if job_status == constants.JOB_STATUS_QUEUED:
      self.CancelJobUnlocked(job)
      return (True, "Job %s canceled" % job.id)
1102

1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
    elif job_status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      try:
        for op in job.ops:
          op.status = constants.OP_STATUS_CANCELING
      finally:
        self.UpdateJobUnlocked(job)
      return (True, "Job %s will be canceled" % job.id)

  @_RequireOpenQueue
  def CancelJobUnlocked(self, job):
    """Marks a job as canceled.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1117
1118
1119
    try:
      for op in job.ops:
        op.status = constants.OP_STATUS_ERROR
1120
        op.result = "Job canceled by request"
Michael Hanselmann's avatar
Michael Hanselmann committed
1121
1122
    finally:
      self.UpdateJobUnlocked(job)
1123

1124
  @_RequireOpenQueue
1125
1126
  def _ArchiveJobsUnlocked(self, jobs):
    """Archives jobs.
1127

1128
1129
1130
1131
    @type jobs: list of L{_QueuedJob}
    @param job: Job objects
    @rtype: int
    @return: Number of archived jobs
1132
1133

    """
1134
1135
1136
1137
1138
1139
1140
1141
    archive_jobs = []
    rename_files = []
    for job in jobs:
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                  constants.JOB_STATUS_SUCCESS,
                                  constants.JOB_STATUS_ERROR):
        logging.debug("Job %s is not yet done", job.id)
        continue
1142

1143
      archive_jobs.append(job)
1144

1145
1146
1147
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)
      rename_files.append((old, new))
1148

1149
1150
    # TODO: What if 1..n files fail to rename?
    self._RenameFilesUnlocked(rename_files)
1151

1152
1153
1154
1155
    logging.debug("Successfully archived job(s) %s",
                  ", ".join(job.id for job in archive_jobs))

    return len(archive_jobs)
1156

Iustin Pop's avatar
Iustin Pop committed
1157
1158
1159
1160
1161
  @utils.LockedMethod
  @_RequireOpenQueue
  def ArchiveJob(self, job_id):
    """Archives a job.

1162
1163
    This is just a wrapper over L{_ArchiveJobUnlocked}.

Iustin Pop's avatar
Iustin Pop committed
1164
1165
    @type job_id: string
    @param job_id: Job ID of job to be archived.
1166
1167
    @rtype: bool
    @return: Whether job was archived
Iustin Pop's avatar
Iustin Pop committed
1168
1169

    """
1170
1171
1172
1173
1174
1175
1176
    logging.info("Archiving job %s", job_id)

    job = self._LoadJobUnlocked(job_id)
    if not job:
      logging.debug("Job %s not found", job_id)
      return False

1177
    return self._ArchiveJobUnlocked([job]) == 1
Iustin Pop's avatar
Iustin Pop committed
1178
1179
1180

  @utils.LockedMethod
  @_RequireOpenQueue
1181
  def AutoArchiveJobs(self, age, timeout):
Iustin Pop's avatar
Iustin Pop committed
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
    """Archives all jobs based on age.

    The method will archive all jobs which are older than the age
    parameter. For jobs that don't have an end timestamp, the start
    timestamp will be considered. The special '-1' age will cause
    archival of all jobs (that are not running or queued).

    @type age: int
    @param age: the minimum age in seconds

    """
    logging.info("Archiving jobs with age more than %s seconds", age)

    now = time.time()
1196
1197
1198
1199
1200
    end_time = now + timeout
    archived_count = 0
    last_touched = 0

    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1201
    pending = []
1202
1203
1204
    for idx, job_id in enumerate(