daemon.py 29.4 KB
Newer Older
1 2 3
#
#

Iustin Pop's avatar
Iustin Pop committed
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
Klaus Aehlig's avatar
Klaus Aehlig committed
5
# All rights reserved.
6
#
Klaus Aehlig's avatar
Klaus Aehlig committed
7 8 9
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
10
#
Klaus Aehlig's avatar
Klaus Aehlig committed
11 12
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
13
#
Klaus Aehlig's avatar
Klaus Aehlig committed
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 30 31 32 33


"""Module with helper classes and functions for daemons"""


34
import asyncore
35
import asynchat
36
import collections
37
import os
38
import signal
39
import logging
40 41
import sched
import time
42
import socket
43
import select
44
import sys
45 46

from ganeti import utils
47
from ganeti import constants
48
from ganeti import errors
49
from ganeti import netutils
50
from ganeti import ssconf
51
from ganeti import runtime
52
from ganeti import compat
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86


class SchedulerBreakout(Exception):
  """Exception used to get out of the scheduler loop

  """


def AsyncoreDelayFunction(timeout):
  """Asyncore-compatible scheduler delay function.

  This is a delay function for sched that, rather than actually sleeping,
  executes asyncore events happening in the meantime.

  After an event has occurred, rather than returning, it raises a
  SchedulerBreakout exception, which will force the current scheduler.run()
  invocation to terminate, so that we can also check for signals. The main loop
  will then call the scheduler run again, which will allow it to actually
  process any due events.

  This is needed because scheduler.run() doesn't support a count=..., as
  asyncore loop, and the scheduler module documents throwing exceptions from
  inside the delay function as an allowed usage model.

  """
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
  raise SchedulerBreakout()


class AsyncoreScheduler(sched.scheduler):
  """Event scheduler integrated with asyncore

  """
  def __init__(self, timefunc):
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
    """Initializes this class.

    """
    sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
    self._max_delay = None

  def run(self, max_delay=None): # pylint: disable=W0221
    """Run any pending events.

    @type max_delay: None or number
    @param max_delay: Maximum delay (useful if caller has timeouts running)

    """
    assert self._max_delay is None

    # The delay function used by the scheduler can't be different on each run,
    # hence an instance variable must be used.
    if max_delay is None:
      self._max_delay = None
    else:
      self._max_delay = utils.RunningTimeout(max_delay, False)

    try:
      return sched.scheduler.run(self)
    finally:
      self._max_delay = None

  def _LimitedDelay(self, duration):
    """Custom delay function for C{sched.scheduler}.

    """
    if self._max_delay is None:
      timeout = duration
    else:
      timeout = min(duration, self._max_delay.Remaining())

    return AsyncoreDelayFunction(timeout)
124 125


126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
  """Base Ganeti Asyncore Dispacher

  """
  # this method is overriding an asyncore.dispatcher method
  def handle_error(self):
    """Log an error in handling any request, and proceed.

    """
    logging.exception("Error while handling asyncore request")

  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    """Most of the time we don't want to check for writability.

    """
    return False


Guido Trotter's avatar
Guido Trotter committed
145 146 147 148 149 150 151 152 153 154 155
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
  """A stream server to use with asyncore.

  Each request is accepted, and then dispatched to a separate asyncore
  dispatcher to handle.

  """

  _REQUEST_QUEUE_SIZE = 5

  def __init__(self, family, address):
156
    """Constructor for AsyncStreamServer
Guido Trotter's avatar
Guido Trotter committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

    @type family: integer
    @param family: socket family (one of socket.AF_*)
    @type address: address family dependent
    @param address: address to bind the socket to

    """
    GanetiBaseAsyncoreDispatcher.__init__(self)
    self.family = family
    self.create_socket(self.family, socket.SOCK_STREAM)
    self.set_reuse_addr()
    self.bind(address)
    self.listen(self._REQUEST_QUEUE_SIZE)

  # this method is overriding an asyncore.dispatcher method
  def handle_accept(self):
    """Accept a new client connection.

    Creates a new instance of the handler class, which will use asyncore to
    serve the client.

    """
    accept_result = utils.IgnoreSignals(self.accept)
    if accept_result is not None:
      connected_socket, client_address = accept_result
      if self.family == socket.AF_UNIX:
        # override the client address, as for unix sockets nothing meaningful
        # is passed in from accept anyway
185
        client_address = netutils.GetSocketCredentials(connected_socket)
Guido Trotter's avatar
Guido Trotter committed
186
      logging.info("Accepted connection from %s",
187
                   netutils.FormatAddress(client_address, family=self.family))
Guido Trotter's avatar
Guido Trotter committed
188 189 190 191 192 193 194 195 196
      self.handle_connection(connected_socket, client_address)

  def handle_connection(self, connected_socket, client_address):
    """Handle an already accepted connection.

    """
    raise NotImplementedError


197 198 199 200 201 202 203
class AsyncTerminatedMessageStream(asynchat.async_chat):
  """A terminator separated message stream asyncore module.

  Handles a stream connection receiving messages terminated by a defined
  separator. For each complete message handle_message is called.

  """
204 205
  def __init__(self, connected_socket, peer_address, terminator, family,
               unhandled_limit):
206 207 208 209 210 211 212 213 214
    """AsyncTerminatedMessageStream constructor.

    @type connected_socket: socket.socket
    @param connected_socket: connected stream socket to receive messages from
    @param peer_address: family-specific peer address
    @type terminator: string
    @param terminator: terminator separating messages in the stream
    @type family: integer
    @param family: socket family
215 216
    @type unhandled_limit: integer or None
    @param unhandled_limit: maximum unanswered messages
217 218 219 220 221 222 223 224 225 226 227 228

    """
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
    # using a positional argument rather than a keyword one.
    asynchat.async_chat.__init__(self, connected_socket)
    self.connected_socket = connected_socket
    # on python 2.4 there is no "family" attribute for the socket class
    # FIXME: when we move to python 2.5 or above remove the family parameter
    #self.family = self.connected_socket.family
    self.family = family
    self.peer_address = peer_address
    self.terminator = terminator
229
    self.unhandled_limit = unhandled_limit
230 231
    self.set_terminator(terminator)
    self.ibuffer = []
232 233
    self.receive_count = 0
    self.send_count = 0
234
    self.oqueue = collections.deque()
235
    self.iqueue = collections.deque()
236 237 238 239 240

  # this method is overriding an asynchat.async_chat method
  def collect_incoming_data(self, data):
    self.ibuffer.append(data)

241 242 243 244 245
  def _can_handle_message(self):
    return (self.unhandled_limit is None or
            (self.receive_count < self.send_count + self.unhandled_limit) and
             not self.iqueue)

246 247 248 249
  # this method is overriding an asynchat.async_chat method
  def found_terminator(self):
    message = "".join(self.ibuffer)
    self.ibuffer = []
250 251 252 253 254 255 256 257 258
    message_id = self.receive_count
    # We need to increase the receive_count after checking if the message can
    # be handled, but before calling handle_message
    can_handle = self._can_handle_message()
    self.receive_count += 1
    if can_handle:
      self.handle_message(message, message_id)
    else:
      self.iqueue.append((message, message_id))
259 260 261 262 263 264 265 266 267 268 269 270 271 272

  def handle_message(self, message, message_id):
    """Handle a terminated message.

    @type message: string
    @param message: message to handle
    @type message_id: integer
    @param message_id: stream's message sequence number

    """
    pass
    # TODO: move this method to raise NotImplementedError
    # raise NotImplementedError

273 274 275 276 277 278 279 280 281 282 283 284 285
  def send_message(self, message):
    """Send a message to the remote peer. This function is thread-safe.

    @type message: string
    @param message: message to send, without the terminator

    @warning: If calling this function from a thread different than the one
    performing the main asyncore loop, remember that you have to wake that one
    up.

    """
    # If we just append the message we received to the output queue, this
    # function can be safely called by multiple threads at the same time, and
286 287 288
    # we don't need locking, since deques are thread safe. handle_write in the
    # asyncore thread will handle the next input message if there are any
    # enqueued.
289 290
    self.oqueue.append(message)

291 292 293 294 295
  # this method is overriding an asyncore.dispatcher method
  def readable(self):
    # read from the socket if we can handle the next requests
    return self._can_handle_message() and asynchat.async_chat.readable(self)

296 297 298 299 300 301 302 303 304 305
  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    # the output queue may become full just after we called writable. This only
    # works if we know we'll have something else waking us up from the select,
    # in such case, anyway.
    return asynchat.async_chat.writable(self) or self.oqueue

  # this method is overriding an asyncore.dispatcher method
  def handle_write(self):
    if self.oqueue:
306 307 308
      # if we have data in the output queue, then send_message was called.
      # this means we can process one more message from the input queue, if
      # there are any.
309 310
      data = self.oqueue.popleft()
      self.push(data + self.terminator)
311 312 313
      self.send_count += 1
      if self.iqueue:
        self.handle_message(*self.iqueue.popleft())
314 315
    self.initiate_send()

316 317
  def close_log(self):
    logging.info("Closing connection from %s",
318
                 netutils.FormatAddress(self.peer_address, family=self.family))
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
    self.close()

  # this method is overriding an asyncore.dispatcher method
  def handle_expt(self):
    self.close_log()

  # this method is overriding an asyncore.dispatcher method
  def handle_error(self):
    """Log an error in handling any request, and proceed.

    """
    logging.exception("Error while handling asyncore request")
    self.close_log()


334
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
335 336 337
  """An improved asyncore udp socket.

  """
Manuel Franceschini's avatar
Manuel Franceschini committed
338
  def __init__(self, family):
339 340 341
    """Constructor for AsyncUDPSocket

    """
342
    GanetiBaseAsyncoreDispatcher.__init__(self)
343
    self._out_queue = []
Manuel Franceschini's avatar
Manuel Franceschini committed
344 345
    self._family = family
    self.create_socket(family, socket.SOCK_DGRAM)
346 347 348 349 350 351 352 353 354 355

  # this method is overriding an asyncore.dispatcher method
  def handle_connect(self):
    # Python thinks that the first udp message from a source qualifies as a
    # "connect" and further ones are part of the same connection. We beg to
    # differ and treat all messages equally.
    pass

  # this method is overriding an asyncore.dispatcher method
  def handle_read(self):
356 357 358 359
    recv_result = utils.IgnoreSignals(self.recvfrom,
                                      constants.MAX_UDP_DATA_SIZE)
    if recv_result is not None:
      payload, address = recv_result
Manuel Franceschini's avatar
Manuel Franceschini committed
360 361 362 363 364 365
      if self._family == socket.AF_INET6:
        # we ignore 'flow info' and 'scope id' as we don't need them
        ip, port, _, _ = address
      else:
        ip, port = address

366
      self.handle_datagram(payload, ip, port)
367 368 369 370 371 372 373 374 375 376 377 378 379

  def handle_datagram(self, payload, ip, port):
    """Handle an already read udp datagram

    """
    raise NotImplementedError

  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    # We should check whether we can write to the socket only if we have
    # something scheduled to be written
    return bool(self._out_queue)

380
  # this method is overriding an asyncore.dispatcher method
381
  def handle_write(self):
Guido Trotter's avatar
Guido Trotter committed
382 383 384 385
    if not self._out_queue:
      logging.error("handle_write called with empty output queue")
      return
    (ip, port, payload) = self._out_queue[0]
Guido Trotter's avatar
Guido Trotter committed
386
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
Guido Trotter's avatar
Guido Trotter committed
387 388
    self._out_queue.pop(0)

389 390 391 392
  def enqueue_send(self, ip, port, payload):
    """Enqueue a datagram to be sent when possible

    """
393
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
394
      raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
395
                                    constants.MAX_UDP_DATA_SIZE))
396 397
    self._out_queue.append((ip, port, payload))

398 399 400 401 402 403 404 405 406
  def process_next_packet(self, timeout=0):
    """Process the next datagram, waiting for it if necessary.

    @type timeout: float
    @param timeout: how long to wait for data
    @rtype: boolean
    @return: True if some data has been handled, False otherwise

    """
407 408
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
    if result is not None and result & select.POLLIN:
Guido Trotter's avatar
Guido Trotter committed
409
      self.handle_read()
410 411 412 413
      return True
    else:
      return False

414

Guido Trotter's avatar
Guido Trotter committed
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
  """A way to notify the asyncore loop that something is going on.

  If an asyncore daemon is multithreaded when a thread tries to push some data
  to a socket, the main loop handling asynchronous requests might be sleeping
  waiting on a select(). To avoid this it can create an instance of the
  AsyncAwaker, which other threads can use to wake it up.

  """
  def __init__(self, signal_fn=None):
    """Constructor for AsyncAwaker

    @type signal_fn: function
    @param signal_fn: function to call when awaken

    """
    GanetiBaseAsyncoreDispatcher.__init__(self)
Iustin Pop's avatar
Iustin Pop committed
432
    assert signal_fn is None or callable(signal_fn)
Guido Trotter's avatar
Guido Trotter committed
433 434 435
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
                                                          socket.SOCK_STREAM)
    self.in_socket.setblocking(0)
436 437
    self.in_socket.shutdown(socket.SHUT_WR)
    self.out_socket.shutdown(socket.SHUT_RD)
Guido Trotter's avatar
Guido Trotter committed
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    self.set_socket(self.in_socket)
    self.need_signal = True
    self.signal_fn = signal_fn
    self.connected = True

  # this method is overriding an asyncore.dispatcher method
  def handle_read(self):
    utils.IgnoreSignals(self.recv, 4096)
    if self.signal_fn:
      self.signal_fn()
    self.need_signal = True

  # this method is overriding an asyncore.dispatcher method
  def close(self):
    asyncore.dispatcher.close(self)
    self.out_socket.close()

  def signal(self):
    """Signal the asyncore main loop.

    Any data we send here will be ignored, but it will cause the select() call
    to return.

    """
    # Yes, there is a race condition here. No, we don't care, at worst we're
    # sending more than one wakeup token, which doesn't harm at all.
    if self.need_signal:
      self.need_signal = False
466
      self.out_socket.send(chr(0))
Guido Trotter's avatar
Guido Trotter committed
467 468


469
class _ShutdownCheck(object):
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
  """Logic for L{Mainloop} shutdown.

  """
  def __init__(self, fn):
    """Initializes this class.

    @type fn: callable
    @param fn: Function returning C{None} if mainloop can be stopped or a
      duration in seconds after which the function should be called again
    @see: L{Mainloop.Run}

    """
    assert callable(fn)

    self._fn = fn
    self._defer = None

  def CanShutdown(self):
    """Checks whether mainloop can be stopped.

    @rtype: bool

    """
    if self._defer and self._defer.Remaining() > 0:
      # A deferred check has already been scheduled
      return False

    # Ask mainloop driver whether we can stop or should check again
    timeout = self._fn()

    if timeout is None:
      # Yes, can stop mainloop
      return True

    # Schedule another check in the future
    self._defer = utils.RunningTimeout(timeout, True)

    return False


510 511 512
class Mainloop(object):
  """Generic mainloop for daemons

513 514 515
  @ivar scheduler: A sched.scheduler object, which can be used to register
    timed events

516
  """
517 518
  _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)

519
  def __init__(self):
520 521 522
    """Constructs a new Mainloop instance.

    """
523
    self._signal_wait = []
524
    self.scheduler = AsyncoreScheduler(time.time)
525

526 527 528
    # Resolve uid/gids used
    runtime.GetEnts()

529 530
  @utils.SignalHandled([signal.SIGCHLD])
  @utils.SignalHandled([signal.SIGTERM])
531
  @utils.SignalHandled([signal.SIGINT])
532
  def Run(self, shutdown_wait_fn=None, signal_handlers=None):
533 534
    """Runs the mainloop.

535 536 537 538
    @type shutdown_wait_fn: callable
    @param shutdown_wait_fn: Function to check whether loop can be terminated;
      B{important}: function must be idempotent and must return either None
      for shutting down or a timeout for another call
539 540
    @type signal_handlers: dict
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
541 542

    """
543 544 545
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
546 547 548

    # Counter for received signals
    shutdown_signals = 0
549

550 551 552
    # Logic to wait for shutdown
    shutdown_waiter = None

553
    # Start actual main loop
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
    while True:
      if shutdown_signals == 1 and shutdown_wait_fn is not None:
        if shutdown_waiter is None:
          shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)

        # Let mainloop driver decide if we can already abort
        if shutdown_waiter.CanShutdown():
          break

        # Re-evaluate in a second
        timeout = 1.0

      elif shutdown_signals >= 1:
        # Abort loop if more than one signal has been sent or no callback has
        # been given
        break

      else:
        # Wait forever on I/O events
        timeout = None

      if self.scheduler.empty():
        asyncore.loop(count=1, timeout=timeout, use_poll=True)
      else:
578
        try:
579
          self.scheduler.run(max_delay=timeout)
580 581
        except SchedulerBreakout:
          pass
582 583

      # Check whether a signal was raised
584
      for (sig, handler) in signal_handlers.items():
585 586
        if handler.called:
          self._CallSignalWaiters(sig)
587 588 589
          if sig in (signal.SIGTERM, signal.SIGINT):
            logging.info("Received signal %s asking for shutdown", sig)
            shutdown_signals += 1
590
          handler.Clear()
Guido Trotter's avatar
Guido Trotter committed
591

592 593 594 595 596 597 598 599
  def _CallSignalWaiters(self, signum):
    """Calls all signal waiters for a certain signal.

    @type signum: int
    @param signum: Signal number

    """
    for owner in self._signal_wait:
600
      owner.OnSignal(signum)
601 602 603 604 605 606 607 608 609 610 611

  def RegisterSignal(self, owner):
    """Registers a receiver for signal notifications

    The receiver must support a "OnSignal(self, signum)" function.

    @type owner: instance
    @param owner: Receiver

    """
    self._signal_wait.append(owner)
612

613

614 615 616
def _VerifyDaemonUser(daemon_name):
  """Verifies the process uid matches the configured uid.

Iustin Pop's avatar
Iustin Pop committed
617 618
  This method verifies that a daemon is started as the user it is
  intended to be run
619 620 621 622 623 624 625 626 627 628 629 630 631 632

  @param daemon_name: The name of daemon to be started
  @return: A tuple with the first item indicating success or not,
           the second item current uid and third with expected uid

  """
  getents = runtime.GetEnts()
  running_uid = os.getuid()
  daemon_uids = {
    constants.MASTERD: getents.masterd_uid,
    constants.RAPI: getents.rapi_uid,
    constants.NODED: getents.noded_uid,
    constants.CONFD: getents.confd_uid,
    }
633
  assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
634 635 636 637 638

  return (daemon_uids[daemon_name] == running_uid, running_uid,
          daemon_uids[daemon_name])


Iustin Pop's avatar
Iustin Pop committed
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
def _BeautifyError(err):
  """Try to format an error better.

  Since we're dealing with daemon startup errors, in many cases this
  will be due to socket error and such, so we try to format these cases better.

  @param err: an exception object
  @rtype: string
  @return: the formatted error description

  """
  try:
    if isinstance(err, socket.error):
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
    elif isinstance(err, EnvironmentError):
      if err.filename is None:
        return "%s (errno=%s)" % (err.strerror, err.errno)
      else:
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
                                            err.errno)
    else:
      return str(err)
661
  except Exception: # pylint: disable=W0703
Iustin Pop's avatar
Iustin Pop committed
662 663 664 665
    logging.exception("Error while handling existing error %s", err)
    return "%s" % str(err)


666
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
667 668
  """Handler for SIGHUP.

669
  @param reopen_fn: List of callback functions for reopening log files
670 671 672

  """
  logging.info("Reopening log files after receiving SIGHUP")
673 674 675 676

  for fn in reopen_fn:
    if fn:
      fn()
677 678


Iustin Pop's avatar
Iustin Pop committed
679 680
def GenericMain(daemon_name, optionparser,
                check_fn, prepare_fn, exec_fn,
Luca Bigliardi's avatar
Luca Bigliardi committed
681
                multithreaded=False, console_logging=False,
682 683
                default_ssl_cert=None, default_ssl_key=None,
                warn_breach=False):
684 685 686 687
  """Shared main function for daemons.

  @type daemon_name: string
  @param daemon_name: daemon name
688
  @type optionparser: optparse.OptionParser
689 690 691 692 693
  @param optionparser: initialized optionparser with daemon-specific options
                       (common -f -d options will be handled by this module)
  @type check_fn: function which accepts (options, args)
  @param check_fn: function that checks start conditions and exits if they're
                   not met
Iustin Pop's avatar
Iustin Pop committed
694 695 696 697 698
  @type prepare_fn: function which accepts (options, args)
  @param prepare_fn: function that is run before forking, or None;
      it's result will be passed as the third parameter to exec_fn, or
      if None was passed in, we will just pass None to exec_fn
  @type exec_fn: function which accepts (options, args, prepare_results)
699 700
  @param exec_fn: function that's executed with the daemon's pid file held, and
                  runs the daemon itself.
701 702
  @type multithreaded: bool
  @param multithreaded: Whether the daemon uses threads
703 704 705
  @type console_logging: boolean
  @param console_logging: if True, the daemon will fall back to the system
                          console if logging fails
706 707 708 709
  @type default_ssl_cert: string
  @param default_ssl_cert: Default SSL certificate path
  @type default_ssl_key: string
  @param default_ssl_key: Default SSL key path
710 711 712 713
  @type warn_breach: bool
  @param warn_breach: issue a warning at daemon launch time, before
      daemonizing, about the possibility of breaking parameter privacy
      invariants through the otherwise helpful debug logging.
714 715 716 717 718 719 720 721

  """
  optionparser.add_option("-f", "--foreground", dest="fork",
                          help="Don't detach from the current terminal",
                          default=True, action="store_false")
  optionparser.add_option("-d", "--debug", dest="debug",
                          help="Enable some debug messages",
                          default=False, action="store_true")
722 723 724 725 726 727
  optionparser.add_option("--syslog", dest="syslog",
                          help="Enable logging to syslog (except debug"
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
                          constants.SYSLOG_USAGE,
                          default=constants.SYSLOG_USAGE,
                          choices=["no", "yes", "only"])
728

729 730 731 732
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
  # family will default to AF_INET if there is no ssconf file (e.g. when
  # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
  # <= 2.2 can not be AF_INET6
733
  if daemon_name in constants.DAEMONS_PORTS:
734
    default_bind_address = constants.IP4_ADDRESS_ANY
735 736
    if family == netutils.IP6Address.family:
      default_bind_address = constants.IP6_ADDRESS_ANY
737

738
    default_port = netutils.GetDaemonPort(daemon_name)
739 740

    # For networked daemons we allow choosing the port and bind address
741
    optionparser.add_option("-p", "--port", dest="port",
742 743
                            help="Network port (default: %s)" % default_port,
                            default=default_port, type="int")
744
    optionparser.add_option("-b", "--bind", dest="bind_address",
745
                            help=("Bind address (default: '%s')" %
746 747
                                  default_bind_address),
                            default=default_bind_address, metavar="ADDRESS")
748 749
    optionparser.add_option("-i", "--interface", dest="bind_interface",
                            help=("Bind interface"), metavar="INTERFACE")
750

751
  if default_ssl_key is not None and default_ssl_cert is not None:
752 753 754 755
    optionparser.add_option("--no-ssl", dest="ssl",
                            help="Do not secure HTTP protocol with SSL",
                            default=True, action="store_false")
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
756 757 758 759
                            help=("SSL key path (default: %s)" %
                                  default_ssl_key),
                            default=default_ssl_key, type="string",
                            metavar="SSL_KEY_PATH")
760
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
761 762 763 764
                            help=("SSL certificate path (default: %s)" %
                                  default_ssl_cert),
                            default=default_ssl_cert, type="string",
                            metavar="SSL_CERT_PATH")
765

766
  # Disable the use of fork(2) if the daemon uses threads
767 768
  if multithreaded:
    utils.DisableFork()
769 770 771

  options, args = optionparser.parse_args()

772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
  if getattr(options, "bind_interface", None) is not None:
    if options.bind_address != default_bind_address:
      msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
             (options.bind_address, options.bind_interface))
      print >> sys.stderr, msg
      sys.exit(constants.EXIT_FAILURE)
    interface_ip_addresses = \
      netutils.GetInterfaceIpAddresses(options.bind_interface)
    if family == netutils.IP6Address.family:
      if_addresses = interface_ip_addresses[constants.IP6_VERSION]
    else:
      if_addresses = interface_ip_addresses[constants.IP4_VERSION]
    if len(if_addresses) < 1:
      msg = "Failed to find IP for interface %s" % options.bind_interace
      print >> sys.stderr, msg
      sys.exit(constants.EXIT_FAILURE)
    options.bind_address = if_addresses[0]

790 791 792 793 794 795 796 797 798
  if getattr(options, "ssl", False):
    ssl_paths = {
      "certificate": options.ssl_cert,
      "key": options.ssl_key,
      }

    for name, path in ssl_paths.iteritems():
      if not os.path.isfile(path):
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
799 800
        sys.exit(constants.EXIT_FAILURE)

801 802 803 804
    # TODO: By initiating http.HttpSslParams here we would only read the files
    # once and have a proper validation (isfile returns False on directories)
    # at the same time.

805 806 807 808 809 810 811
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
  if not result:
    msg = ("%s started using wrong user ID (%d), expected %d" %
           (daemon_name, running_uid, expected_uid))
    print >> sys.stderr, msg
    sys.exit(constants.EXIT_FAILURE)

812 813 814
  if check_fn is not None:
    check_fn(options, args)

815
  log_filename = constants.DAEMONS_LOGFILES[daemon_name]
816

817 818 819 820
  # node-daemon logging in lib/http/server.py, _HandleServerRequestInner
  if options.debug and warn_breach:
    sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name)

821
  if options.fork:
822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
    # Newer GnuTLS versions (>= 3.3.0) use a library constructor for
    # initialization and open /dev/urandom on library load time, way before we
    # fork(). Closing /dev/urandom causes subsequent ganeti.http.client
    # requests to fail and the process to receive a SIGABRT. As we cannot
    # reliably detect GnuTLS's socket, we work our way around this by keeping
    # all fds referring to /dev/urandom open.
    noclose_fds = []
    for fd in os.listdir("/proc/self/fd"):
      try:
        if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom":
          noclose_fds.append(int(fd))
      except EnvironmentError:
        # The fd might have disappeared (although it shouldn't as we're running
        # single-threaded).
        continue

    utils.CloseFDs(noclose_fds=noclose_fds)
839
    (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
840
  else:
841
    (wpipe, stdio_reopen_fn) = (None, None)
842

843
  log_reopen_fn = \
844
    utils.SetupLogging(log_filename, daemon_name,
845 846 847 848 849 850 851
                       debug=options.debug,
                       stderr_logging=not options.fork,
                       multithreaded=multithreaded,
                       syslog=options.syslog,
                       console_logging=console_logging)

  # Reopen log file(s) on SIGHUP
852 853
  signal.signal(signal.SIGHUP,
                compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
854

855 856 857 858 859 860
  try:
    utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
  except errors.PidFileLockError, err:
    print >> sys.stderr, "Error while locking PID file:\n%s" % err
    sys.exit(constants.EXIT_FAILURE)

861
  try:
862
    try:
863
      logging.info("%s daemon startup", daemon_name)
864 865 866 867 868
      if callable(prepare_fn):
        prep_results = prepare_fn(options, args)
      else:
        prep_results = None
    except Exception, err:
869
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
870 871 872 873 874 875
      raise

    if wpipe is not None:
      # we're done with the preparation phase, we close the pipe to
      # let the parent know it's safe to exit
      os.close(wpipe)
Iustin Pop's avatar
Iustin Pop committed
876 877

    exec_fn(options, args, prep_results)
878
  finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
879
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))