jqueue.py 38.2 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008 Google Inc.
Iustin Pop's avatar
Iustin Pop committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


22
23
"""Module implementing the job queue handling.

24
25
26
27
28
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

@var JOBQUEUE_THREADS: the number of worker threads we start for
    processing jobs
29
30

"""
Iustin Pop's avatar
Iustin Pop committed
31

32
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
33
34
import logging
import threading
35
36
import errno
import re
37
import time
38
import weakref
Iustin Pop's avatar
Iustin Pop committed
39

Michael Hanselmann's avatar
Michael Hanselmann committed
40
from ganeti import constants
41
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
42
from ganeti import workerpool
43
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
44
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
Michael Hanselmann's avatar
Michael Hanselmann committed
49

50

51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
Michael Hanselmann's avatar
Michael Hanselmann committed
53

Iustin Pop's avatar
Iustin Pop committed
54

55
class CancelJob(Exception):
56
57
58
59
60
  """Special exception to cancel a job.

  """


61
def TimeStampNow():
62
63
64
65
66
67
  """Returns the current timestamp.

  @rtype: tuple
  @return: the current time in the (seconds, microseconds) format

  """
68
69
70
  return utils.SplitTime(time.time())


Michael Hanselmann's avatar
Michael Hanselmann committed
71
class _QueuedOpCode(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
72
  """Encapsulates an opcode object.
Michael Hanselmann's avatar
Michael Hanselmann committed
73

74
75
76
77
78
79
80
  @ivar log: holds the execution log and consists of tuples
  of the form C{(log_serial, timestamp, level, message)}
  @ivar input: the OpCode we encapsulate
  @ivar status: the current status
  @ivar result: the result of the LU execution
  @ivar start_timestamp: timestamp for the start of the execution
  @ivar stop_timestamp: timestamp for the end of the execution
81

Michael Hanselmann's avatar
Michael Hanselmann committed
82
  """
83
84
85
86
  __slots__ = ["input", "status", "result", "log",
               "start_timestamp", "end_timestamp",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
87
  def __init__(self, op):
88
89
90
91
92
93
    """Constructor for the _QuededOpCode.

    @type op: L{opcodes.OpCode}
    @param op: the opcode we encapsulate

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
94
95
96
97
    self.input = op
    self.status = constants.OP_STATUS_QUEUED
    self.result = None
    self.log = []
98
99
    self.start_timestamp = None
    self.end_timestamp = None
100
101
102

  @classmethod
  def Restore(cls, state):
103
104
105
106
107
108
109
110
    """Restore the _QueuedOpCode from the serialized form.

    @type state: dict
    @param state: the serialized state
    @rtype: _QueuedOpCode
    @return: a new _QueuedOpCode instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
111
112
113
114
115
    obj = _QueuedOpCode.__new__(cls)
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
    obj.status = state["status"]
    obj.result = state["result"]
    obj.log = state["log"]
116
117
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
118
119
120
    return obj

  def Serialize(self):
121
122
123
124
125
126
    """Serializes this _QueuedOpCode.

    @rtype: dict
    @return: the dictionary holding the serialized state

    """
127
128
129
130
131
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      "log": self.log,
132
133
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
134
      }
135

Michael Hanselmann's avatar
Michael Hanselmann committed
136
137
138
139

class _QueuedJob(object):
  """In-memory job representation.

140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
  This is what we use to track the user-submitted jobs. Locking must
  be taken care of by users of this class.

  @type queue: L{JobQueue}
  @ivar queue: the parent queue
  @ivar id: the job ID
  @type ops: list
  @ivar ops: the list of _QueuedOpCode that constitute the job
  @type run_op_index: int
  @ivar run_op_index: the currently executing opcode, or -1 if
      we didn't yet start executing
  @type log_serial: int
  @ivar log_serial: holds the index for the next log entry
  @ivar received_timestamp: the timestamp for when the job was received
  @ivar start_timestmap: the timestamp for start of execution
  @ivar end_timestamp: the timestamp for end of execution
  @ivar change: a Condition variable we use for waiting for job changes
Michael Hanselmann's avatar
Michael Hanselmann committed
157
158

  """
159
160
161
162
163
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
               "received_timestamp", "start_timestamp", "end_timestamp",
               "change",
               "__weakref__"]

Michael Hanselmann's avatar
Michael Hanselmann committed
164
  def __init__(self, queue, job_id, ops):
165
166
167
168
169
170
171
172
173
174
175
    """Constructor for the _QueuedJob.

    @type queue: L{JobQueue}
    @param queue: our parent queue
    @type job_id: job_id
    @param job_id: our job id
    @type ops: list
    @param ops: the list of opcodes we hold, which will be encapsulated
        in _QueuedOpCodes

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
176
    if not ops:
177
      # TODO: use a better exception
Michael Hanselmann's avatar
Michael Hanselmann committed
178
179
      raise Exception("No opcodes")

Michael Hanselmann's avatar
Michael Hanselmann committed
180
    self.queue = queue
181
    self.id = job_id
Michael Hanselmann's avatar
Michael Hanselmann committed
182
183
    self.ops = [_QueuedOpCode(op) for op in ops]
    self.run_op_index = -1
184
    self.log_serial = 0
185
186
187
    self.received_timestamp = TimeStampNow()
    self.start_timestamp = None
    self.end_timestamp = None
188
189
190

    # Condition to wait for changes
    self.change = threading.Condition(self.queue._lock)
191
192

  @classmethod
Michael Hanselmann's avatar
Michael Hanselmann committed
193
  def Restore(cls, queue, state):
194
195
196
197
198
199
200
201
202
203
    """Restore a _QueuedJob from serialized state:

    @type queue: L{JobQueue}
    @param queue: to which queue the restored job belongs
    @type state: dict
    @param state: the serialized state
    @rtype: _JobQueue
    @return: the restored _JobQueue instance

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
204
205
206
207
    obj = _QueuedJob.__new__(cls)
    obj.queue = queue
    obj.id = state["id"]
    obj.run_op_index = state["run_op_index"]
208
209
210
    obj.received_timestamp = state.get("received_timestamp", None)
    obj.start_timestamp = state.get("start_timestamp", None)
    obj.end_timestamp = state.get("end_timestamp", None)
211
212
213
214
215
216
217
218
219
220
221
222

    obj.ops = []
    obj.log_serial = 0
    for op_state in state["ops"]:
      op = _QueuedOpCode.Restore(op_state)
      for log_entry in op.log:
        obj.log_serial = max(obj.log_serial, log_entry[0])
      obj.ops.append(op)

    # Condition to wait for changes
    obj.change = threading.Condition(obj.queue._lock)

223
224
225
    return obj

  def Serialize(self):
226
227
228
229
230
231
    """Serialize the _JobQueue instance.

    @rtype: dict
    @return: the serialized state

    """
232
233
    return {
      "id": self.id,
Michael Hanselmann's avatar
Michael Hanselmann committed
234
      "ops": [op.Serialize() for op in self.ops],
235
      "run_op_index": self.run_op_index,
236
237
238
      "start_timestamp": self.start_timestamp,
      "end_timestamp": self.end_timestamp,
      "received_timestamp": self.received_timestamp,
239
240
      }

Michael Hanselmann's avatar
Michael Hanselmann committed
241
  def CalcStatus(self):
242
243
244
245
246
247
248
249
250
251
    """Compute the status of this job.

    This function iterates over all the _QueuedOpCodes in the job and
    based on their status, computes the job status.

    The algorithm is:
      - if we find a cancelled, or finished with error, the job
        status will be the same
      - otherwise, the last opcode with the status one of:
          - waitlock
252
          - canceling
253
254
255
256
257
258
259
260
261
262
          - running

        will determine the job status

      - otherwise, it means either all opcodes are queued, or success,
        and the job status will be the same

    @return: the job status

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
263
264
265
    status = constants.JOB_STATUS_QUEUED

    all_success = True
Michael Hanselmann's avatar
Michael Hanselmann committed
266
267
    for op in self.ops:
      if op.status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
268
269
270
271
        continue

      all_success = False

Michael Hanselmann's avatar
Michael Hanselmann committed
272
      if op.status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
273
        pass
Iustin Pop's avatar
Iustin Pop committed
274
275
      elif op.status == constants.OP_STATUS_WAITLOCK:
        status = constants.JOB_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
276
      elif op.status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
277
        status = constants.JOB_STATUS_RUNNING
278
279
280
      elif op.status == constants.OP_STATUS_CANCELING:
        status = constants.JOB_STATUS_CANCELING
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
281
      elif op.status == constants.OP_STATUS_ERROR:
282
283
284
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
285
      elif op.status == constants.OP_STATUS_CANCELED:
286
287
        status = constants.OP_STATUS_CANCELED
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
288
289
290
291
292
293

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

294
  def GetLogEntries(self, newer_than):
295
296
297
    """Selectively returns the log entries.

    @type newer_than: None or int
Michael Hanselmann's avatar
Michael Hanselmann committed
298
    @param newer_than: if this is None, return all log entries,
299
300
301
302
303
304
        otherwise return only the log entries with serial higher
        than this value
    @rtype: list
    @return: the list of the log entries selected

    """
305
306
307
308
309
310
311
    if newer_than is None:
      serial = -1
    else:
      serial = newer_than

    entries = []
    for op in self.ops:
312
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
313
314
315

    return entries

316

Michael Hanselmann's avatar
Michael Hanselmann committed
317
class _JobQueueWorker(workerpool.BaseWorker):
318
319
320
  """The actual job workers.

  """
Iustin Pop's avatar
Iustin Pop committed
321
322
323
324
325
326
327
328
329
330
331
332
333
334
  def _NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

    This is called from the mcpu code as a notifier function, when the
    LU is finally about to start the Exec() method. Of course, to have
    end-user visible results, the opcode must be initially (before
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.

    """
    assert self.queue, "Queue attribute is missing"
    assert self.opcode, "Opcode attribute is missing"

    self.queue.acquire()
    try:
335
336
337
338
339
340
341
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
                                    constants.OP_STATUS_CANCELING)

      # Cancel here if we were asked to
      if self.opcode.status == constants.OP_STATUS_CANCELING:
        raise CancelJob()

Iustin Pop's avatar
Iustin Pop committed
342
343
344
345
      self.opcode.status = constants.OP_STATUS_RUNNING
    finally:
      self.queue.release()

Michael Hanselmann's avatar
Michael Hanselmann committed
346
  def RunTask(self, job):
Michael Hanselmann's avatar
Michael Hanselmann committed
347
348
    """Job executor.

349
350
    This functions processes a job. It is closely tied to the _QueuedJob and
    _QueuedOpCode classes.
Michael Hanselmann's avatar
Michael Hanselmann committed
351

352
353
354
    @type job: L{_QueuedJob}
    @param job: the job to be processed

Michael Hanselmann's avatar
Michael Hanselmann committed
355
    """
356
    logging.info("Worker %s processing job %s",
Michael Hanselmann's avatar
Michael Hanselmann committed
357
                  self.worker_id, job.id)
358
    proc = mcpu.Processor(self.pool.queue.context)
Iustin Pop's avatar
Iustin Pop committed
359
    self.queue = queue = job.queue
Michael Hanselmann's avatar
Michael Hanselmann committed
360
    try:
Michael Hanselmann's avatar
Michael Hanselmann committed
361
362
363
      try:
        count = len(job.ops)
        for idx, op in enumerate(job.ops):
364
          op_summary = op.input.Summary()
Michael Hanselmann's avatar
Michael Hanselmann committed
365
          try:
366
367
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                         op_summary)
Michael Hanselmann's avatar
Michael Hanselmann committed
368
369
370

            queue.acquire()
            try:
371
372
              if op.status == constants.OP_STATUS_CANCELED:
                raise CancelJob()
373
              assert op.status == constants.OP_STATUS_QUEUED
Michael Hanselmann's avatar
Michael Hanselmann committed
374
              job.run_op_index = idx
Iustin Pop's avatar
Iustin Pop committed
375
              op.status = constants.OP_STATUS_WAITLOCK
Michael Hanselmann's avatar
Michael Hanselmann committed
376
              op.result = None
377
              op.start_timestamp = TimeStampNow()
378
379
              if idx == 0: # first opcode
                job.start_timestamp = op.start_timestamp
Michael Hanselmann's avatar
Michael Hanselmann committed
380
381
              queue.UpdateJobUnlocked(job)

Iustin Pop's avatar
Iustin Pop committed
382
              input_opcode = op.input
Michael Hanselmann's avatar
Michael Hanselmann committed
383
384
385
            finally:
              queue.release()

386
            def _Log(*args):
387
388
389
390
391
392
393
394
395
396
397
398
399
400
              """Append a log entry.

              """
              assert len(args) < 3

              if len(args) == 1:
                log_type = constants.ELOG_MESSAGE
                log_msg = args[0]
              else:
                log_type, log_msg = args

              # The time is split to make serialization easier and not lose
              # precision.
              timestamp = utils.SplitTime(time.time())
401

402
              queue.acquire()
403
              try:
404
405
406
                job.log_serial += 1
                op.log.append((job.log_serial, timestamp, log_type, log_msg))

407
408
                job.change.notifyAll()
              finally:
409
                queue.release()
410

411
            # Make sure not to hold lock while _Log is called
Iustin Pop's avatar
Iustin Pop committed
412
413
            self.opcode = op
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
Michael Hanselmann's avatar
Michael Hanselmann committed
414
415
416
417
418

            queue.acquire()
            try:
              op.status = constants.OP_STATUS_SUCCESS
              op.result = result
419
              op.end_timestamp = TimeStampNow()
Michael Hanselmann's avatar
Michael Hanselmann committed
420
421
422
423
              queue.UpdateJobUnlocked(job)
            finally:
              queue.release()

424
425
            logging.info("Op %s/%s: Successfully finished opcode %s",
                         idx + 1, count, op_summary)
426
427
428
          except CancelJob:
            # Will be handled further up
            raise
Michael Hanselmann's avatar
Michael Hanselmann committed
429
430
431
432
433
434
          except Exception, err:
            queue.acquire()
            try:
              try:
                op.status = constants.OP_STATUS_ERROR
                op.result = str(err)
435
                op.end_timestamp = TimeStampNow()
436
437
                logging.info("Op %s/%s: Error in opcode %s: %s",
                             idx + 1, count, op_summary, err)
Michael Hanselmann's avatar
Michael Hanselmann committed
438
439
440
441
442
443
              finally:
                queue.UpdateJobUnlocked(job)
            finally:
              queue.release()
            raise

444
445
446
447
448
449
      except CancelJob:
        queue.acquire()
        try:
          queue.CancelJobUnlocked(job)
        finally:
          queue.release()
Michael Hanselmann's avatar
Michael Hanselmann committed
450
451
452
453
      except errors.GenericError, err:
        logging.exception("Ganeti exception")
      except:
        logging.exception("Unhandled exception")
Michael Hanselmann's avatar
Michael Hanselmann committed
454
    finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
455
456
      queue.acquire()
      try:
457
458
        try:
          job.run_op_idx = -1
459
          job.end_timestamp = TimeStampNow()
460
461
462
463
          queue.UpdateJobUnlocked(job)
        finally:
          job_id = job.id
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
464
465
      finally:
        queue.release()
466
467
      logging.info("Worker %s finished job %s, status = %s",
                   self.worker_id, job_id, status)
Michael Hanselmann's avatar
Michael Hanselmann committed
468
469
470


class _JobQueueWorkerPool(workerpool.WorkerPool):
471
472
473
  """Simple class implementing a job-processing workerpool.

  """
474
  def __init__(self, queue):
Michael Hanselmann's avatar
Michael Hanselmann committed
475
476
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
477
    self.queue = queue
Michael Hanselmann's avatar
Michael Hanselmann committed
478
479


Michael Hanselmann's avatar
Michael Hanselmann committed
480
class JobQueue(object):
Michael Hanselmann's avatar
Michael Hanselmann committed
481
  """Queue used to manage the jobs.
482
483
484
485

  @cvar _RE_JOB_FILE: regex matching the valid job file names

  """
486
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
487

488
489
490
  def _RequireOpenQueue(fn):
    """Decorator for "public" functions.

491
492
    This function should be used for all 'public' functions. That is,
    functions usually called from other classes.
493

494
    @warning: Use this decorator only after utils.LockedMethod!
495

496
    Example::
497
498
499
500
501
502
503
      @utils.LockedMethod
      @_RequireOpenQueue
      def Example(self):
        pass

    """
    def wrapper(self, *args, **kwargs):
504
      assert self._queue_lock is not None, "Queue should be open"
505
506
507
      return fn(self, *args, **kwargs)
    return wrapper

Michael Hanselmann's avatar
Michael Hanselmann committed
508
  def __init__(self, context):
509
510
511
512
513
514
515
516
517
518
519
520
    """Constructor for JobQueue.

    The constructor will initialize the job queue object and then
    start loading the current jobs from disk, either for starting them
    (if they were queue) or for aborting them (if they were already
    running).

    @type context: GanetiContext
    @param context: the context object for access to the configuration
        data and other ganeti objects

    """
521
    self.context = context
522
    self._memcache = weakref.WeakValueDictionary()
523
    self._my_hostname = utils.HostInfo().name
524

Michael Hanselmann's avatar
Michael Hanselmann committed
525
526
527
528
529
    # Locking
    self._lock = threading.Lock()
    self.acquire = self._lock.acquire
    self.release = self._lock.release

530
    # Initialize
531
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
532

533
534
535
536
    # Read serial file
    self._last_serial = jstore.ReadSerial()
    assert self._last_serial is not None, ("Serial file was modified between"
                                           " check in jstore and here")
537

538
    # Get initial list of nodes
539
    self._nodes = dict((n.name, n.primary_ip)
540
541
                       for n in self.context.cfg.GetAllNodesInfo().values()
                       if n.master_candidate)
542
543
544

    # Remove master node
    try:
545
      del self._nodes[self._my_hostname]
546
    except KeyError:
547
      pass
548
549
550

    # TODO: Check consistency across nodes

Michael Hanselmann's avatar
Michael Hanselmann committed
551
    # Setup worker pool
552
    self._wpool = _JobQueueWorkerPool(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
553
    try:
554
555
556
557
      # We need to lock here because WorkerPool.AddTask() may start a job while
      # we're still doing our work.
      self.acquire()
      try:
558
559
560
        logging.info("Inspecting job queue")

        all_job_ids = self._GetJobIDsUnlocked()
561
        jobs_count = len(all_job_ids)
562
563
564
        lastinfo = time.time()
        for idx, job_id in enumerate(all_job_ids):
          # Give an update every 1000 jobs or 10 seconds
565
566
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
              idx == (jobs_count - 1)):
567
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
568
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
569
570
571
572
            lastinfo = time.time()

          job = self._LoadJobUnlocked(job_id)

573
574
575
          # a failure in loading the job can cause 'None' to be returned
          if job is None:
            continue
576

577
          status = job.CalcStatus()
Michael Hanselmann's avatar
Michael Hanselmann committed
578

579
580
          if status in (constants.JOB_STATUS_QUEUED, ):
            self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
581

582
          elif status in (constants.JOB_STATUS_RUNNING,
583
584
                          constants.JOB_STATUS_WAITLOCK,
                          constants.JOB_STATUS_CANCELING):
585
586
587
588
589
590
591
            logging.warning("Unfinished job %s found: %s", job.id, job)
            try:
              for op in job.ops:
                op.status = constants.OP_STATUS_ERROR
                op.result = "Unclean master daemon shutdown"
            finally:
              self.UpdateJobUnlocked(job)
592
593

        logging.info("Job queue inspection finished")
594
595
596
597
598
      finally:
        self.release()
    except:
      self._wpool.TerminateWorkers()
      raise
Michael Hanselmann's avatar
Michael Hanselmann committed
599

600
601
  @utils.LockedMethod
  @_RequireOpenQueue
602
603
604
605
606
607
608
609
  def AddNode(self, node):
    """Register a new node with the queue.

    @type node: L{objects.Node}
    @param node: the node object to be added

    """
    node_name = node.name
610
    assert node_name != self._my_hostname
611

612
    # Clean queue directory on added node
613
    rpc.RpcRunner.call_jobqueue_purge(node_name)
614

615
616
617
618
619
620
    if not node.master_candidate:
      # remove if existing, ignoring errors
      self._nodes.pop(node_name, None)
      # and skip the replication of the job ids
      return

621
622
    # Upload the whole queue excluding archived jobs
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
623

624
625
626
627
    # Upload current serial file
    files.append(constants.JOB_QUEUE_SERIAL_FILE)

    for file_name in files:
628
629
630
631
632
633
634
      # Read file content
      fd = open(file_name, "r")
      try:
        content = fd.read()
      finally:
        fd.close()

635
636
637
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
                                                  [node.primary_ip],
                                                  file_name, content)
638
639
640
      if not result[node_name]:
        logging.error("Failed to upload %s to %s", file_name, node_name)

641
    self._nodes[node_name] = node.primary_ip
642
643
644
645

  @utils.LockedMethod
  @_RequireOpenQueue
  def RemoveNode(self, node_name):
646
647
648
649
650
651
    """Callback called when removing nodes from the cluster.

    @type node_name: str
    @param node_name: the name of the node to remove

    """
652
    try:
653
      # The queue is removed by the "leave node" RPC call.
654
      del self._nodes[node_name]
655
    except KeyError:
656
657
      pass

658
  def _CheckRpcResult(self, result, nodes, failmsg):
659
660
661
662
    """Verifies the status of an RPC call.

    Since we aim to keep consistency should this node (the current
    master) fail, we will log errors if our rpc fail, and especially
Michael Hanselmann's avatar
Michael Hanselmann committed
663
    log the case when more than half of the nodes fails.
664
665
666
667
668
669
670
671

    @param result: the data as returned from the rpc call
    @type nodes: list
    @param nodes: the list of nodes we made the call to
    @type failmsg: str
    @param failmsg: the identifier to be used for logging

    """
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
    failed = []
    success = []

    for node in nodes:
      if result[node]:
        success.append(node)
      else:
        failed.append(node)

    if failed:
      logging.error("%s failed on %s", failmsg, ", ".join(failed))

    # +1 for the master node
    if (len(success) + 1) < len(failed):
      # TODO: Handle failing nodes
      logging.error("More than half of the nodes failed")

689
690
691
  def _GetNodeIp(self):
    """Helper for returning the node name/ip list.

692
693
694
695
    @rtype: (list, list)
    @return: a tuple of two lists, the first one with the node
        names and the second one with the node addresses

696
697
698
699
700
    """
    name_list = self._nodes.keys()
    addr_list = [self._nodes[name] for name in name_list]
    return name_list, addr_list

701
702
703
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
    """Writes a file locally and then replicates it to all nodes.

704
705
706
707
708
709
710
711
    This function will replace the contents of a file on the local
    node and then replicate it to all the other nodes we have.

    @type file_name: str
    @param file_name: the path of the file to be replicated
    @type data: str
    @param data: the new contents of the file

712
713
714
    """
    utils.WriteFile(file_name, data=data)

715
    names, addrs = self._GetNodeIp()
716
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
717
718
    self._CheckRpcResult(result, self._nodes,
                         "Updating %s" % file_name)
719

720
  def _RenameFilesUnlocked(self, rename):
721
722
723
724
725
    """Renames a file locally and then replicate the change.

    This function will rename a file in the local queue directory
    and then replicate this rename to all the other nodes we have.

726
727
    @type rename: list of (old, new)
    @param rename: List containing tuples mapping old to new names
728
729

    """
730
    # Rename them locally
731
732
    for old, new in rename:
      utils.RenameFile(old, new, mkdir=True)
733

734
735
736
737
    # ... and on all nodes
    names, addrs = self._GetNodeIp()
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
738

Michael Hanselmann's avatar
Michael Hanselmann committed
739
  def _FormatJobID(self, job_id):
740
741
742
743
744
745
746
747
748
749
750
751
    """Convert a job ID to string format.

    Currently this just does C{str(job_id)} after performing some
    checks, but if we want to change the job id format this will
    abstract this change.

    @type job_id: int or long
    @param job_id: the numeric job id
    @rtype: str
    @return: the formatted job id

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
752
753
754
755
756
757
758
    if not isinstance(job_id, (int, long)):
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
    if job_id < 0:
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)

    return str(job_id)

759
760
761
762
763
764
765
766
767
768
769
770
  @classmethod
  def _GetArchiveDirectory(cls, job_id):
    """Returns the archive directory for a job.

    @type job_id: str
    @param job_id: Job identifier
    @rtype: str
    @return: Directory name

    """
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)

771
  def _NewSerialUnlocked(self):
772
773
774
775
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

776
777
    @rtype: str
    @return: a string representing the job identifier.
778
779
780
781
782
783

    """
    # New number
    serial = self._last_serial + 1

    # Write to file
784
785
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
                                        "%s\n" % serial)
786
787
788
789

    # Keep it only if we were able to write the file
    self._last_serial = serial

Michael Hanselmann's avatar
Michael Hanselmann committed
790
    return self._FormatJobID(serial)
791

Michael Hanselmann's avatar
Michael Hanselmann committed
792
793
  @staticmethod
  def _GetJobPath(job_id):
794
795
796
797
798
799
800
801
    """Returns the job file for a given job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the job file

    """
802
803
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

804
805
  @classmethod
  def _GetArchivedJobPath(cls, job_id):
806
807
808
809
810
811
812
813
    """Returns the archived job file for a give job id.

    @type job_id: str
    @param job_id: the job identifier
    @rtype: str
    @return: the path to the archived job file

    """
814
815
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
816

Michael Hanselmann's avatar
Michael Hanselmann committed
817
818
  @classmethod
  def _ExtractJobID(cls, name):
819
820
821
822
823
824
825
826
827
828
    """Extract the job id from a filename.

    @type name: str
    @param name: the job filename
    @rtype: job id or None
    @return: the job id corresponding to the given filename,
        or None if the filename does not represent a valid
        job file

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
829
    m = cls._RE_JOB_FILE.match(name)
830
831
832
833
834
    if m:
      return m.group(1)
    else:
      return None

835
836
837
838
839
840
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

Iustin Pop's avatar
Iustin Pop committed
841
842
843
844
    The method only looks at disk because it's a requirement that all
    jobs are present on disk (so in the _memcache we don't have any
    extra IDs).

845
846
847
    @rtype: list
    @return: the list of job IDs

848
    """
849
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
Iustin Pop's avatar
Iustin Pop committed
850
    jlist = utils.NiceSort(jlist)
851
    return jlist
852

853
  def _ListJobFiles(self):
854
855
856
857
858
859
    """Returns the list of current job files.

    @rtype: list
    @return: the list of job file names

    """
860
861
862
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

863
  def _LoadJobUnlocked(self, job_id):
864
865
866
867
868
869
870
871
872
873
874
    """Loads a job from the disk or memory.

    Given a job id, this will return the cached job object if
    existing, or try to load the job from the disk. If loading from
    disk, it will also add the job to the cache.

    @param job_id: the job id
    @rtype: L{_QueuedJob} or None
    @return: either None or the job object

    """
875
876
    job = self._memcache.get(job_id, None)
    if job:
877
      logging.debug("Found job %s in memcache", job_id)
878
      return job
Iustin Pop's avatar
Iustin Pop committed
879

880
    filepath = self._GetJobPath(job_id)
881
882
883
884
885
886
887
888
889
890
891
892
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

893
894
895
896
897
898
899
900
901
902
    try:
      job = _QueuedJob.Restore(self, data)
    except Exception, err:
      new_path = self._GetArchivedJobPath(job_id)
      if filepath == new_path:
        # job already archived (future case)
        logging.exception("Can't parse job %s", job_id)
      else:
        # non-archived case
        logging.exception("Can't parse job %s, will archive.", job_id)
903
        self._RenameFilesUnlocked([(filepath, new_path)])
904
905
      return None

Iustin Pop's avatar
Iustin Pop committed
906
    self._memcache[job_id] = job
907
    logging.debug("Added job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
908
    return job
909
910

  def _GetJobsUnlocked(self, job_ids):
911
912
913
914
915
916
917
918
919
    """Return a list of jobs based on their IDs.

    @type job_ids: list
    @param job_ids: either an empty list (meaning all jobs),
        or a list of job IDs
    @rtype: list
    @return: the list of job objects

    """
920
921
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
922

923
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
924

925
926
927
928
929
930
931
  @staticmethod
  def _IsQueueMarkedDrain():
    """Check if the queue is marked from drain.

    This currently uses the queue drain file, which makes it a
    per-node flag. In the future this can be moved to the config file.

932
933
934
    @rtype: boolean
    @return: True of the job queue is marked for draining

935
936
937
    """
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)

938
939
940
941
942
943
944
  @staticmethod
  def SetDrainFlag(drain_flag):
    """Sets the drain flag for the queue.

    This is similar to the function L{backend.JobQueueSetDrainFlag},
    and in the future we might merge them.

945
    @type drain_flag: boolean
Michael Hanselmann's avatar
Michael Hanselmann committed
946
    @param drain_flag: Whether to set or unset the drain flag
947

948
949
950
951
952
953
954
    """
    if drain_flag:
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
    else:
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
    return True

955
  @utils.LockedMethod
956
  @_RequireOpenQueue
957
  def SubmitJob(self, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
958
    """Create and store a new job.
959

Michael Hanselmann's avatar
Michael Hanselmann committed
960
961
    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.
962
963

    @type ops: list
964
    @param ops: The list of OpCodes that will become the new job.
965
966
967
    @rtype: job ID
    @return: the job ID of the newly created job
    @raise errors.JobQueueDrainError: if the job is marked for draining
968
969

    """
970
971
    if self._IsQueueMarkedDrain():
      raise errors.JobQueueDrainError()
Michael Hanselmann's avatar
Michael Hanselmann committed
972
973
974
975
976
977
978
979
980
981
982
983

    # Check job queue size
    size = len(self._ListJobFiles())
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
      # TODO: Autoarchive jobs. Make sure it's not done on every job
      # submission, though.
      #size = ...
      pass

    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
      raise errors.JobQueueFull()

984
    # Get job identifier
985
    job_id = self._NewSerialUnlocked()
986
987
988
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
Michael Hanselmann's avatar
Michael Hanselmann committed
989
    self.UpdateJobUnlocked(job)
990

991
    logging.debug("Adding new job %s to the cache", job_id)
Iustin Pop's avatar
Iustin Pop committed
992
993
    self._memcache[job_id] = job

Michael Hanselmann's avatar
Michael Hanselmann committed
994
995
996
997
    # Add to worker pool
    self._wpool.AddTask(job)

    return job.id
998

999
  @_RequireOpenQueue
Michael Hanselmann's avatar
Michael Hanselmann committed
1000
  def UpdateJobUnlocked(self, job):
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
    """Update a job's on disk storage.

    After a job has been modified, this function needs to be called in
    order to write the changes to disk and replicate them to the other
    nodes.

    @type job: L{_QueuedJob}
    @param job: the changed job

    """
1011
    filename = self._GetJobPath(job.id)
1012
    data = serializer.DumpJson(job.Serialize(), indent=False)
1013
    logging.debug("Writing job %s to %s", job.id, filename)
1014
    self._WriteAndReplicateFileUnlocked(filename, data)
Iustin Pop's avatar
Iustin Pop committed
1015

1016
    # Notify waiters about potential changes
1017
    job.change.notifyAll()
1018

1019
  @utils.LockedMethod
1020
  @_RequireOpenQueue
1021
1022
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                        timeout):
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
    """Waits for changes in a job.

    @type job_id: string
    @param job_id: Job identifier
    @type fields: list of strings
    @param fields: Which fields to check for changes
    @type prev_job_info: list or None
    @param prev_job_info: Last job information returned
    @type prev_log_serial: int
    @param prev_log_serial: Last job message serial number
1033
1034
    @type timeout: float
    @param timeout: maximum time to wait
1035
1036
1037
1038
1039
1040
1041
1042
    @rtype: tuple (job info, log entries)
    @return: a tuple of the job information as required via
        the fields parameter, and the log entries as a list

        if the job has not changed and the timeout has expired,
        we instead return a special value,
        L{constants.JOB_NOTCHANGED}, which should be interpreted
        as such by the clients
1043
1044

    """
1045
    logging.debug("Waiting for changes in job %s", job_id)
1046
    end_time = time.time() + timeout
1047
    while True:
1048
1049
1050
1051
      delta_time = end_time - time.time()
      if delta_time < 0:
        return constants.JOB_NOTCHANGED

1052
1053
1054
1055
      job = self._LoadJobUnlocked(job_id)
      if not job:
        logging.debug("Job %s not found", job_id)
        break
1056

1057
1058
1059
      status = job.CalcStatus()
      job_info = self._GetJobInfoUnlocked(job, fields)
      log_entries = job.GetLogEntries(prev_log_serial)
1060
1061
1062
1063
1064
1065

      # Serializing and deserializing data can cause type changes (e.g. from
      # tuple to list) or precision loss. We're doing it here so that we get
      # the same modifications as the data received from the client. Without
      # this, the comparison afterwards might fail without the data being
      # significantly different.
1066
1067
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1068

1069
      if status not in (constants.JOB_STATUS_QUEUED,
Iustin Pop's avatar
Iustin Pop committed
1070
1071
                        constants.JOB_STATUS_RUNNING,
                        constants.JOB_STATUS_WAITLOCK):
1072
1073
        # Don't even try to wait if the job is no longer running, there will be
        # no changes.
1074
1075
        break

1076
1077
1078
1079
1080
1081
1082
      if (prev_job_info != job_info or
          (log_entries and prev_log_serial != log_entries[0][0])):
        break

      logging.debug("Waiting again")

      # Release the queue lock while waiting
1083
      job.change.wait(delta_time)
1084
1085
1086

    logging.debug("Job %s changed", job_id)

1087
    return (job_info, log_entries)
1088

1089
  @utils.LockedMethod
1090
  @_RequireOpenQueue
1091
1092
1093
  def CancelJob(self, job_id):
    """Cancels a job.

1094
1095
    This will only succeed if the job has not started yet.

1096
    @type job_id: string
1097
    @param job_id: job ID of job to be cancelled.
1098
1099

    """
1100
    logging.info("Cancelling job %s", job_id)
1101

Michael Hanselmann's avatar
Michael Hanselmann committed
1102
    job = self._LoadJobUnlocked(job_id)
1103
1104
    if not job:
      logging.debug("Job %s not found", job_id)
1105
1106
1107
      return (False, "Job %s not found" % job_id)

    job_status = job.CalcStatus()
1108

1109
1110
    if job_status not in (constants.JOB_STATUS_QUEUED,
                          constants.JOB_STATUS_WAITLOCK):
1111
      logging.debug("Job %s is no longer in the queue", job.id)
1112
1113
1114
1115
1116
      return (False, "Job %s is no longer in the queue" % job.id)

    if job_status == constants.JOB_STATUS_QUEUED:
      self.CancelJobUnlocked(job)
      return (True, "Job %s canceled" % job.id)
1117

1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
    elif job_status == constants.JOB_STATUS_WAITLOCK:
      # The worker will notice the new status and cancel the job
      try:
        for op in job.ops:
          op.status = constants.OP_STATUS_CANCELING
      finally:
        self.UpdateJobUnlocked(job)
      return (True, "Job %s will be canceled" % job.id)

  @_RequireOpenQueue
  def CancelJobUnlocked(self, job):
    """Marks a job as canceled.

    """
Michael Hanselmann's avatar
Michael Hanselmann committed
1132
1133
    try:
      for op in job.ops:
1134
        op.status = constants.OP_STATUS_CANCELED
1135
        op.result = "Job canceled by request"
Michael Hanselmann's avatar
Michael Hanselmann committed
1136
1137
    finally:
      self.UpdateJobUnlocked(job)
1138

1139
  @_RequireOpenQueue
1140
1141
  def _ArchiveJobsUnlocked(self, jobs):
    """Archives jobs.
1142

1143
    @type jobs: list of L{_QueuedJob}
Iustin Pop's avatar
Iustin Pop committed
1144
    @param jobs: Job objects
1145
1146
    @rtype: int
    @return: Number of archived jobs
1147
1148

    """
1149
1150
1151
1152
1153
1154
1155
1156
    archive_jobs = []
    rename_files = []
    for job in jobs:
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
                                  constants.JOB_STATUS_SUCCESS,
                                  constants.JOB_STATUS_ERROR):
        logging.debug("Job %s is not yet done", job.id)
        continue
1157

1158
      archive_jobs.append(job)
1159

1160
1161
1162
      old = self._GetJobPath(job.id)
      new = self._GetArchivedJobPath(job.id)
      rename_files.append((old, new))
1163

1164
1165
    # TODO: What if 1..n files fail to rename?
    self._RenameFilesUnlocked(rename_files)
1166