jqueue.py 12 KB
Newer Older
Iustin Pop's avatar
Iustin Pop committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
#
#

# Copyright (C) 2006, 2007 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module implementing the job queue handling."""

24
import os
Michael Hanselmann's avatar
Michael Hanselmann committed
25 26
import logging
import threading
27 28
import errno
import re
Iustin Pop's avatar
Iustin Pop committed
29

Michael Hanselmann's avatar
Michael Hanselmann committed
30
from ganeti import constants
31
from ganeti import serializer
Michael Hanselmann's avatar
Michael Hanselmann committed
32
from ganeti import workerpool
33
from ganeti import opcodes
Iustin Pop's avatar
Iustin Pop committed
34
from ganeti import errors
Michael Hanselmann's avatar
Michael Hanselmann committed
35
from ganeti import mcpu
36
from ganeti import utils
Michael Hanselmann's avatar
Michael Hanselmann committed
37 38 39 40


JOBQUEUE_THREADS = 5

Iustin Pop's avatar
Iustin Pop committed
41

Michael Hanselmann's avatar
Michael Hanselmann committed
42 43 44
class _QueuedOpCode(object):
  """Encasulates an opcode object.

45
  Access is synchronized by the '_lock' attribute.
Michael Hanselmann's avatar
Michael Hanselmann committed
46 47 48

  """
  def __init__(self, op):
49 50 51
    self.__Setup(op, constants.OP_STATUS_QUEUED, None)

  def __Setup(self, input, status, result):
52
    self._lock = threading.Lock()
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    self.input = input
    self.status = status
    self.result = result

  @classmethod
  def Restore(cls, state):
    obj = object.__new__(cls)
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
                state["status"], state["result"])
    return obj

  @utils.LockedMethod
  def Serialize(self):
    return {
      "input": self.input.__getstate__(),
      "status": self.status,
      "result": self.result,
      }
71

72 73 74 75 76 77 78
  @utils.LockedMethod
  def GetInput(self):
    """Returns the original opcode.

    """
    return self.input

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
  @utils.LockedMethod
  def SetStatus(self, status, result):
    """Update the opcode status and result.

    """
    self.status = status
    self.result = result

  @utils.LockedMethod
  def GetStatus(self):
    """Get the opcode status.

    """
    return self.status

  @utils.LockedMethod
  def GetResult(self):
    """Get the opcode result.

    """
    return self.result
Michael Hanselmann's avatar
Michael Hanselmann committed
100 101 102 103 104 105 106 107


class _QueuedJob(object):
  """In-memory job representation.

  This is what we use to track the user-submitted jobs.

  """
108
  def __init__(self, storage, job_id, ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
109 110 111 112
    if not ops:
      # TODO
      raise Exception("No opcodes")

113
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
114

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
  def __Setup(self, storage, job_id, ops):
    self.storage = storage
    self.id = job_id
    self._ops = ops

  @classmethod
  def Restore(cls, storage, state):
    obj = object.__new__(cls)
    obj.__Setup(storage, state["id"],
                [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]])
    return obj

  def Serialize(self):
    return {
      "id": self.id,
      "ops": [op.Serialize() for op in self._ops],
      }

  def SetUnclean(self, msg):
    try:
      for op in self._ops:
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
    finally:
      self.storage.UpdateJob(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
139

140
  def GetStatus(self):
Michael Hanselmann's avatar
Michael Hanselmann committed
141 142 143 144
    status = constants.JOB_STATUS_QUEUED

    all_success = True
    for op in self._ops:
145 146
      op_status = op.GetStatus()
      if op_status == constants.OP_STATUS_SUCCESS:
Michael Hanselmann's avatar
Michael Hanselmann committed
147 148 149 150
        continue

      all_success = False

151
      if op_status == constants.OP_STATUS_QUEUED:
Michael Hanselmann's avatar
Michael Hanselmann committed
152
        pass
153
      elif op_status == constants.OP_STATUS_RUNNING:
Michael Hanselmann's avatar
Michael Hanselmann committed
154
        status = constants.JOB_STATUS_RUNNING
155 156 157 158
      elif op_status == constants.OP_STATUS_ERROR:
        status = constants.JOB_STATUS_ERROR
        # The whole job fails if one opcode failed
        break
Michael Hanselmann's avatar
Michael Hanselmann committed
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175

    if all_success:
      status = constants.JOB_STATUS_SUCCESS

    return status

  def Run(self, proc):
    """Job executor.

    This functions processes a this job in the context of given processor
    instance.

    Args:
    - proc: Ganeti Processor to run the job with

    """
    try:
176 177
      count = len(self._ops)
      for idx, op in enumerate(self._ops):
Michael Hanselmann's avatar
Michael Hanselmann committed
178
        try:
179 180
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
181
          self.storage.UpdateJob(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
182 183 184

          result = proc.ExecOpCode(op.input)

185
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
186
          self.storage.UpdateJob(self)
187 188
          logging.debug("Op %s/%s: Successfully finished %s",
                        idx + 1, count, op)
Michael Hanselmann's avatar
Michael Hanselmann committed
189
        except Exception, err:
190 191 192 193 194
          try:
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
          finally:
            self.storage.UpdateJob(self)
Michael Hanselmann's avatar
Michael Hanselmann committed
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
          raise

    except errors.GenericError, err:
      logging.error("ganeti exception %s", exc_info=err)
    except Exception, err:
      logging.error("unhandled exception %s", exc_info=err)
    except:
      logging.error("unhandled unknown exception %s", exc_info=err)


class _JobQueueWorker(workerpool.BaseWorker):
  def RunTask(self, job):
    logging.debug("Worker %s processing job %s",
                  self.worker_id, job.id)
    # TODO: feedback function
    proc = mcpu.Processor(self.pool.context, feedback=lambda x: None)
    try:
      job.Run(proc)
    finally:
      logging.debug("Worker %s finished job %s, status = %s",
                    self.worker_id, job.id, job.GetStatus())


class _JobQueueWorkerPool(workerpool.WorkerPool):
  def __init__(self, context):
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
                                              _JobQueueWorker)
    self.context = context


225
class JobStorage(object):
226
  _RE_JOB_FILE = re.compile(r"^job-(\d+)$")
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315

  def __init__(self):
    self._lock = threading.Lock()

    # Make sure our directory exists
    try:
      os.mkdir(constants.QUEUE_DIR, 0700)
    except OSError, err:
      if err.errno not in (errno.EEXIST, ):
        raise

    # Get queue lock
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
    try:
      utils.LockFile(self.lock_fd)
    except:
      self.lock_fd.close()
      raise

    # Read version
    try:
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
    except IOError, err:
      if err.errno not in (errno.ENOENT, ):
        raise

      # Setup a new queue
      self._InitQueueUnlocked()

      # Try to open again
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")

    try:
      # Try to read version
      version = int(version_fd.read(128))

      # Verify version
      if version != constants.JOB_QUEUE_VERSION:
        raise errors.JobQueueError("Found version %s, expected %s",
                                   version, constants.JOB_QUEUE_VERSION)
    finally:
      version_fd.close()

    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
    try:
      # Read last serial
      self._last_serial = int(serial_fd.read(1024).strip())
    finally:
      serial_fd.close()

  def Close(self):
    assert self.lock_fd, "Queue should be open"

    self.lock_fd.close()
    self.lock_fd = None

  def _InitQueueUnlocked(self):
    assert self.lock_fd, "Queue should be open"

    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
                    data="%s\n" % 0)

  def _NewSerialUnlocked(self):
    """Generates a new job identifier.

    Job identifiers are unique during the lifetime of a cluster.

    Returns: A string representing the job identifier.

    """
    assert self.lock_fd, "Queue should be open"

    # New number
    serial = self._last_serial + 1

    # Write to file
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
                    data="%s\n" % serial)

    # Keep it only if we were able to write the file
    self._last_serial = serial

    return serial

  def _GetJobPath(self, job_id):
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)

316 317 318 319 320 321 322 323
  def _GetJobIDsUnlocked(self, archived=False):
    """Return all known job IDs.

    If the parameter archived is True, archived jobs IDs will be
    included. Currently this argument is unused.

    """
    jfiles = self._ListJobFiles()
Iustin Pop's avatar
Iustin Pop committed
324
    return [int(m.group(1)) for m in
325 326
            [self._RE_JOB_FILE.match(name) for name in jfiles]]

327 328 329 330 331 332
  def _ListJobFiles(self):
    assert self.lock_fd, "Queue should be open"

    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
            if self._RE_JOB_FILE.match(name)]

333
  def _LoadJobUnlocked(self, job_id):
334 335
    assert self.lock_fd, "Queue should be open"

336
    filepath = self._GetJobPath(job_id)
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
    logging.debug("Loading job from %s", filepath)
    try:
      fd = open(filepath, "r")
    except IOError, err:
      if err.errno in (errno.ENOENT, ):
        return None
      raise
    try:
      data = serializer.LoadJson(fd.read())
    finally:
      fd.close()

    return _QueuedJob.Restore(self, data)

  def _GetJobsUnlocked(self, job_ids):
352 353
    if not job_ids:
      job_ids = self._GetJobIDsUnlocked()
354

355
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389

  @utils.LockedMethod
  def GetJobs(self, job_ids):
    return self._GetJobsUnlocked(job_ids)

  @utils.LockedMethod
  def AddJob(self, ops):
    assert self.lock_fd, "Queue should be open"

    # Get job identifier
    job_id = self._NewSerialUnlocked()
    job = _QueuedJob(self, job_id, ops)

    # Write to disk
    self._UpdateJobUnlocked(job)

    return job

  def _UpdateJobUnlocked(self, job):
    assert self.lock_fd, "Queue should be open"

    filename = self._GetJobPath(job.id)
    logging.debug("Writing job %s to %s", job.id, filename)
    utils.WriteFile(filename,
                    data=serializer.DumpJson(job.Serialize(), indent=False))

  @utils.LockedMethod
  def UpdateJob(self, job):
    return self._UpdateJobUnlocked(job)

  def ArchiveJob(self, job_id):
    raise NotImplementedError()


Michael Hanselmann's avatar
Michael Hanselmann committed
390 391 392 393 394 395
class JobQueue:
  """The job queue.

   """
  def __init__(self, context):
    self._lock = threading.Lock()
396
    self._jobs = JobStorage()
Michael Hanselmann's avatar
Michael Hanselmann committed
397 398
    self._wpool = _JobQueueWorkerPool(context)

399 400 401 402
    for job in self._jobs.GetJobs(None):
      status = job.GetStatus()
      if status in (constants.JOB_STATUS_QUEUED, ):
        self._wpool.AddTask(job)
Michael Hanselmann's avatar
Michael Hanselmann committed
403

404 405 406
      elif status in (constants.JOB_STATUS_RUNNING, ):
        logging.warning("Unfinished job %s found: %s", job.id, job)
        job.SetUnclean("Unclean master daemon shutdown")
Michael Hanselmann's avatar
Michael Hanselmann committed
407

408
  @utils.LockedMethod
Michael Hanselmann's avatar
Michael Hanselmann committed
409 410 411 412 413 414 415 416 417 418
  def SubmitJob(self, ops):
    """Add a new job to the queue.

    This enters the job into our job queue and also puts it on the new
    queue, in order for it to be picked up by the queue processors.

    Args:
    - ops: Sequence of opcodes

    """
419
    job = self._jobs.AddJob(ops)
Michael Hanselmann's avatar
Michael Hanselmann committed
420 421 422 423

    # Add to worker pool
    self._wpool.AddTask(job)

424
    return job.id
Michael Hanselmann's avatar
Michael Hanselmann committed
425 426 427 428 429 430 431 432 433 434 435 436 437 438

  def ArchiveJob(self, job_id):
    raise NotImplementedError()

  def CancelJob(self, job_id):
    raise NotImplementedError()

  def _GetJobInfo(self, job, fields):
    row = []
    for fname in fields:
      if fname == "id":
        row.append(job.id)
      elif fname == "status":
        row.append(job.GetStatus())
439 440 441
      elif fname == "ops":
        row.append([op.GetInput().__getstate__() for op in job._ops])
      elif fname == "opresult":
442
        row.append([op.GetResult() for op in job._ops])
443 444
      elif fname == "opstatus":
        row.append([op.GetStatus() for op in job._ops])
Michael Hanselmann's avatar
Michael Hanselmann committed
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
      else:
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
    return row

  def QueryJobs(self, job_ids, fields):
    """Returns a list of jobs in queue.

    Args:
    - job_ids: Sequence of job identifiers or None for all
    - fields: Names of fields to return

    """
    self._lock.acquire()
    try:
      jobs = []

461
      for job in self._jobs.GetJobs(job_ids):
Michael Hanselmann's avatar
Michael Hanselmann committed
462 463 464 465 466 467 468 469 470
        if job is None:
          jobs.append(None)
        else:
          jobs.append(self._GetJobInfo(job, fields))

      return jobs
    finally:
      self._lock.release()

471
  @utils.LockedMethod
Michael Hanselmann's avatar
Michael Hanselmann committed
472 473 474 475 476
  def Shutdown(self):
    """Stops the job queue.

    """
    self._wpool.TerminateWorkers()
477
    self._jobs.Close()