daemon.py 23.6 KB
Newer Older
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module with helper classes and functions for daemons"""


25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
32
import sched
import time
33
import socket
34
import select
35
import sys
36
37

from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43
from ganeti import compat
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78


class SchedulerBreakout(Exception):
  """Exception used to get out of the scheduler loop

  """


def AsyncoreDelayFunction(timeout):
  """Asyncore-compatible scheduler delay function.

  This is a delay function for sched that, rather than actually sleeping,
  executes asyncore events happening in the meantime.

  After an event has occurred, rather than returning, it raises a
  SchedulerBreakout exception, which will force the current scheduler.run()
  invocation to terminate, so that we can also check for signals. The main loop
  will then call the scheduler run again, which will allow it to actually
  process any due events.

  This is needed because scheduler.run() doesn't support a count=..., as
  asyncore loop, and the scheduler module documents throwing exceptions from
  inside the delay function as an allowed usage model.

  """
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
  raise SchedulerBreakout()


class AsyncoreScheduler(sched.scheduler):
  """Event scheduler integrated with asyncore

  """
  def __init__(self, timefunc):
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
79
80


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
  """Base Ganeti Asyncore Dispacher

  """
  # this method is overriding an asyncore.dispatcher method
  def handle_error(self):
    """Log an error in handling any request, and proceed.

    """
    logging.exception("Error while handling asyncore request")

  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    """Most of the time we don't want to check for writability.

    """
    return False


Guido Trotter's avatar
Guido Trotter committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
  """A stream server to use with asyncore.

  Each request is accepted, and then dispatched to a separate asyncore
  dispatcher to handle.

  """

  _REQUEST_QUEUE_SIZE = 5

  def __init__(self, family, address):
    """Constructor for AsyncUnixStreamSocket

    @type family: integer
    @param family: socket family (one of socket.AF_*)
    @type address: address family dependent
    @param address: address to bind the socket to

    """
    GanetiBaseAsyncoreDispatcher.__init__(self)
    self.family = family
    self.create_socket(self.family, socket.SOCK_STREAM)
    self.set_reuse_addr()
    self.bind(address)
    self.listen(self._REQUEST_QUEUE_SIZE)

  # this method is overriding an asyncore.dispatcher method
  def handle_accept(self):
    """Accept a new client connection.

    Creates a new instance of the handler class, which will use asyncore to
    serve the client.

    """
    accept_result = utils.IgnoreSignals(self.accept)
    if accept_result is not None:
      connected_socket, client_address = accept_result
      if self.family == socket.AF_UNIX:
        # override the client address, as for unix sockets nothing meaningful
        # is passed in from accept anyway
140
        client_address = netutils.GetSocketCredentials(connected_socket)
Guido Trotter's avatar
Guido Trotter committed
141
      logging.info("Accepted connection from %s",
142
                   netutils.FormatAddress(client_address, family=self.family))
Guido Trotter's avatar
Guido Trotter committed
143
144
145
146
147
148
149
150
151
      self.handle_connection(connected_socket, client_address)

  def handle_connection(self, connected_socket, client_address):
    """Handle an already accepted connection.

    """
    raise NotImplementedError


152
153
154
155
156
157
158
class AsyncTerminatedMessageStream(asynchat.async_chat):
  """A terminator separated message stream asyncore module.

  Handles a stream connection receiving messages terminated by a defined
  separator. For each complete message handle_message is called.

  """
159
160
  def __init__(self, connected_socket, peer_address, terminator, family,
               unhandled_limit):
161
162
163
164
165
166
167
168
169
    """AsyncTerminatedMessageStream constructor.

    @type connected_socket: socket.socket
    @param connected_socket: connected stream socket to receive messages from
    @param peer_address: family-specific peer address
    @type terminator: string
    @param terminator: terminator separating messages in the stream
    @type family: integer
    @param family: socket family
170
171
    @type unhandled_limit: integer or None
    @param unhandled_limit: maximum unanswered messages
172
173
174
175
176
177
178
179
180
181
182
183

    """
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
    # using a positional argument rather than a keyword one.
    asynchat.async_chat.__init__(self, connected_socket)
    self.connected_socket = connected_socket
    # on python 2.4 there is no "family" attribute for the socket class
    # FIXME: when we move to python 2.5 or above remove the family parameter
    #self.family = self.connected_socket.family
    self.family = family
    self.peer_address = peer_address
    self.terminator = terminator
184
    self.unhandled_limit = unhandled_limit
185
186
    self.set_terminator(terminator)
    self.ibuffer = []
187
188
    self.receive_count = 0
    self.send_count = 0
189
    self.oqueue = collections.deque()
190
    self.iqueue = collections.deque()
191
192
193
194
195

  # this method is overriding an asynchat.async_chat method
  def collect_incoming_data(self, data):
    self.ibuffer.append(data)

196
197
198
199
200
  def _can_handle_message(self):
    return (self.unhandled_limit is None or
            (self.receive_count < self.send_count + self.unhandled_limit) and
             not self.iqueue)

201
202
203
204
  # this method is overriding an asynchat.async_chat method
  def found_terminator(self):
    message = "".join(self.ibuffer)
    self.ibuffer = []
205
206
207
208
209
210
211
212
213
    message_id = self.receive_count
    # We need to increase the receive_count after checking if the message can
    # be handled, but before calling handle_message
    can_handle = self._can_handle_message()
    self.receive_count += 1
    if can_handle:
      self.handle_message(message, message_id)
    else:
      self.iqueue.append((message, message_id))
214
215
216
217
218
219
220
221
222
223
224
225
226
227

  def handle_message(self, message, message_id):
    """Handle a terminated message.

    @type message: string
    @param message: message to handle
    @type message_id: integer
    @param message_id: stream's message sequence number

    """
    pass
    # TODO: move this method to raise NotImplementedError
    # raise NotImplementedError

228
229
230
231
232
233
234
235
236
237
238
239
240
  def send_message(self, message):
    """Send a message to the remote peer. This function is thread-safe.

    @type message: string
    @param message: message to send, without the terminator

    @warning: If calling this function from a thread different than the one
    performing the main asyncore loop, remember that you have to wake that one
    up.

    """
    # If we just append the message we received to the output queue, this
    # function can be safely called by multiple threads at the same time, and
241
242
243
    # we don't need locking, since deques are thread safe. handle_write in the
    # asyncore thread will handle the next input message if there are any
    # enqueued.
244
245
    self.oqueue.append(message)

246
247
248
249
250
  # this method is overriding an asyncore.dispatcher method
  def readable(self):
    # read from the socket if we can handle the next requests
    return self._can_handle_message() and asynchat.async_chat.readable(self)

251
252
253
254
255
256
257
258
259
260
  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    # the output queue may become full just after we called writable. This only
    # works if we know we'll have something else waking us up from the select,
    # in such case, anyway.
    return asynchat.async_chat.writable(self) or self.oqueue

  # this method is overriding an asyncore.dispatcher method
  def handle_write(self):
    if self.oqueue:
261
262
263
      # if we have data in the output queue, then send_message was called.
      # this means we can process one more message from the input queue, if
      # there are any.
264
265
      data = self.oqueue.popleft()
      self.push(data + self.terminator)
266
267
268
      self.send_count += 1
      if self.iqueue:
        self.handle_message(*self.iqueue.popleft())
269
270
    self.initiate_send()

271
272
  def close_log(self):
    logging.info("Closing connection from %s",
273
                 netutils.FormatAddress(self.peer_address, family=self.family))
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
    self.close()

  # this method is overriding an asyncore.dispatcher method
  def handle_expt(self):
    self.close_log()

  # this method is overriding an asyncore.dispatcher method
  def handle_error(self):
    """Log an error in handling any request, and proceed.

    """
    logging.exception("Error while handling asyncore request")
    self.close_log()


289
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290
291
292
  """An improved asyncore udp socket.

  """
Manuel Franceschini's avatar
Manuel Franceschini committed
293
  def __init__(self, family):
294
295
296
    """Constructor for AsyncUDPSocket

    """
297
    GanetiBaseAsyncoreDispatcher.__init__(self)
298
    self._out_queue = []
Manuel Franceschini's avatar
Manuel Franceschini committed
299
300
    self._family = family
    self.create_socket(family, socket.SOCK_DGRAM)
301
302
303
304
305
306
307
308
309
310

  # this method is overriding an asyncore.dispatcher method
  def handle_connect(self):
    # Python thinks that the first udp message from a source qualifies as a
    # "connect" and further ones are part of the same connection. We beg to
    # differ and treat all messages equally.
    pass

  # this method is overriding an asyncore.dispatcher method
  def handle_read(self):
311
312
313
314
    recv_result = utils.IgnoreSignals(self.recvfrom,
                                      constants.MAX_UDP_DATA_SIZE)
    if recv_result is not None:
      payload, address = recv_result
Manuel Franceschini's avatar
Manuel Franceschini committed
315
316
317
318
319
320
      if self._family == socket.AF_INET6:
        # we ignore 'flow info' and 'scope id' as we don't need them
        ip, port, _, _ = address
      else:
        ip, port = address

321
      self.handle_datagram(payload, ip, port)
322
323
324
325
326
327
328
329
330
331
332
333
334

  def handle_datagram(self, payload, ip, port):
    """Handle an already read udp datagram

    """
    raise NotImplementedError

  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    # We should check whether we can write to the socket only if we have
    # something scheduled to be written
    return bool(self._out_queue)

335
  # this method is overriding an asyncore.dispatcher method
336
  def handle_write(self):
Guido Trotter's avatar
Guido Trotter committed
337
338
339
340
    if not self._out_queue:
      logging.error("handle_write called with empty output queue")
      return
    (ip, port, payload) = self._out_queue[0]
Guido Trotter's avatar
Guido Trotter committed
341
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
Guido Trotter's avatar
Guido Trotter committed
342
343
    self._out_queue.pop(0)

344
345
346
347
  def enqueue_send(self, ip, port, payload):
    """Enqueue a datagram to be sent when possible

    """
348
349
350
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
                                    constants.MAX_UDP_DATA_SIZE))
351
352
    self._out_queue.append((ip, port, payload))

353
354
355
356
357
358
359
360
361
  def process_next_packet(self, timeout=0):
    """Process the next datagram, waiting for it if necessary.

    @type timeout: float
    @param timeout: how long to wait for data
    @rtype: boolean
    @return: True if some data has been handled, False otherwise

    """
362
363
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
    if result is not None and result & select.POLLIN:
Guido Trotter's avatar
Guido Trotter committed
364
      self.handle_read()
365
366
367
368
      return True
    else:
      return False

369

Guido Trotter's avatar
Guido Trotter committed
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
  """A way to notify the asyncore loop that something is going on.

  If an asyncore daemon is multithreaded when a thread tries to push some data
  to a socket, the main loop handling asynchronous requests might be sleeping
  waiting on a select(). To avoid this it can create an instance of the
  AsyncAwaker, which other threads can use to wake it up.

  """
  def __init__(self, signal_fn=None):
    """Constructor for AsyncAwaker

    @type signal_fn: function
    @param signal_fn: function to call when awaken

    """
    GanetiBaseAsyncoreDispatcher.__init__(self)
    assert signal_fn == None or callable(signal_fn)
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
                                                          socket.SOCK_STREAM)
    self.in_socket.setblocking(0)
391
392
    self.in_socket.shutdown(socket.SHUT_WR)
    self.out_socket.shutdown(socket.SHUT_RD)
Guido Trotter's avatar
Guido Trotter committed
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
    self.set_socket(self.in_socket)
    self.need_signal = True
    self.signal_fn = signal_fn
    self.connected = True

  # this method is overriding an asyncore.dispatcher method
  def handle_read(self):
    utils.IgnoreSignals(self.recv, 4096)
    if self.signal_fn:
      self.signal_fn()
    self.need_signal = True

  # this method is overriding an asyncore.dispatcher method
  def close(self):
    asyncore.dispatcher.close(self)
    self.out_socket.close()

  def signal(self):
    """Signal the asyncore main loop.

    Any data we send here will be ignored, but it will cause the select() call
    to return.

    """
    # Yes, there is a race condition here. No, we don't care, at worst we're
    # sending more than one wakeup token, which doesn't harm at all.
    if self.need_signal:
      self.need_signal = False
      self.out_socket.send("\0")


424
425
426
class Mainloop(object):
  """Generic mainloop for daemons

427
428
429
  @ivar scheduler: A sched.scheduler object, which can be used to register
    timed events

430
431
  """
  def __init__(self):
432
433
434
    """Constructs a new Mainloop instance.

    """
435
    self._signal_wait = []
436
    self.scheduler = AsyncoreScheduler(time.time)
437

438
439
440
    # Resolve uid/gids used
    runtime.GetEnts()

441
442
  @utils.SignalHandled([signal.SIGCHLD])
  @utils.SignalHandled([signal.SIGTERM])
443
  @utils.SignalHandled([signal.SIGINT])
444
  def Run(self, signal_handlers=None):
445
446
    """Runs the mainloop.

447
448
    @type signal_handlers: dict
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
449
450

    """
451
452
453
454
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    running = True
455

456
457
    # Start actual main loop
    while running:
458
459
460
461
462
463
464
      if not self.scheduler.empty():
        try:
          self.scheduler.run()
        except SchedulerBreakout:
          pass
      else:
        asyncore.loop(count=1, use_poll=True)
465
466
467
468
469
470

      # Check whether a signal was raised
      for sig in signal_handlers:
        handler = signal_handlers[sig]
        if handler.called:
          self._CallSignalWaiters(sig)
471
          running = sig not in (signal.SIGTERM, signal.SIGINT)
472
          handler.Clear()
Guido Trotter's avatar
Guido Trotter committed
473

474
475
476
477
478
479
480
481
  def _CallSignalWaiters(self, signum):
    """Calls all signal waiters for a certain signal.

    @type signum: int
    @param signum: Signal number

    """
    for owner in self._signal_wait:
482
      owner.OnSignal(signum)
483
484
485
486
487
488
489
490
491
492
493

  def RegisterSignal(self, owner):
    """Registers a receiver for signal notifications

    The receiver must support a "OnSignal(self, signum)" function.

    @type owner: instance
    @param owner: Receiver

    """
    self._signal_wait.append(owner)
494

495

496
497
498
def _VerifyDaemonUser(daemon_name):
  """Verifies the process uid matches the configured uid.

Iustin Pop's avatar
Iustin Pop committed
499
500
  This method verifies that a daemon is started as the user it is
  intended to be run
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519

  @param daemon_name: The name of daemon to be started
  @return: A tuple with the first item indicating success or not,
           the second item current uid and third with expected uid

  """
  getents = runtime.GetEnts()
  running_uid = os.getuid()
  daemon_uids = {
    constants.MASTERD: getents.masterd_uid,
    constants.RAPI: getents.rapi_uid,
    constants.NODED: getents.noded_uid,
    constants.CONFD: getents.confd_uid,
    }

  return (daemon_uids[daemon_name] == running_uid, running_uid,
          daemon_uids[daemon_name])


Iustin Pop's avatar
Iustin Pop committed
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def _BeautifyError(err):
  """Try to format an error better.

  Since we're dealing with daemon startup errors, in many cases this
  will be due to socket error and such, so we try to format these cases better.

  @param err: an exception object
  @rtype: string
  @return: the formatted error description

  """
  try:
    if isinstance(err, socket.error):
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
    elif isinstance(err, EnvironmentError):
      if err.filename is None:
        return "%s (errno=%s)" % (err.strerror, err.errno)
      else:
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
                                            err.errno)
    else:
      return str(err)
  except Exception: # pylint: disable-msg=W0703
    logging.exception("Error while handling existing error %s", err)
    return "%s" % str(err)


547
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable-msg=W0613
548
549
  """Handler for SIGHUP.

550
  @param reopen_fn: List of callback functions for reopening log files
551
552
553

  """
  logging.info("Reopening log files after receiving SIGHUP")
554
555
556
557

  for fn in reopen_fn:
    if fn:
      fn()
558
559


Iustin Pop's avatar
Iustin Pop committed
560
561
def GenericMain(daemon_name, optionparser,
                check_fn, prepare_fn, exec_fn,
Luca Bigliardi's avatar
Luca Bigliardi committed
562
                multithreaded=False, console_logging=False,
563
                default_ssl_cert=None, default_ssl_key=None):
564
565
566
567
  """Shared main function for daemons.

  @type daemon_name: string
  @param daemon_name: daemon name
568
  @type optionparser: optparse.OptionParser
569
570
571
572
573
  @param optionparser: initialized optionparser with daemon-specific options
                       (common -f -d options will be handled by this module)
  @type check_fn: function which accepts (options, args)
  @param check_fn: function that checks start conditions and exits if they're
                   not met
Iustin Pop's avatar
Iustin Pop committed
574
575
576
577
578
  @type prepare_fn: function which accepts (options, args)
  @param prepare_fn: function that is run before forking, or None;
      it's result will be passed as the third parameter to exec_fn, or
      if None was passed in, we will just pass None to exec_fn
  @type exec_fn: function which accepts (options, args, prepare_results)
579
580
  @param exec_fn: function that's executed with the daemon's pid file held, and
                  runs the daemon itself.
581
582
  @type multithreaded: bool
  @param multithreaded: Whether the daemon uses threads
583
584
585
  @type console_logging: boolean
  @param console_logging: if True, the daemon will fall back to the system
                          console if logging fails
586
587
588
589
  @type default_ssl_cert: string
  @param default_ssl_cert: Default SSL certificate path
  @type default_ssl_key: string
  @param default_ssl_key: Default SSL key path
590
591
592
593
594
595
596
597

  """
  optionparser.add_option("-f", "--foreground", dest="fork",
                          help="Don't detach from the current terminal",
                          default=True, action="store_false")
  optionparser.add_option("-d", "--debug", dest="debug",
                          help="Enable some debug messages",
                          default=False, action="store_true")
598
599
600
601
602
603
  optionparser.add_option("--syslog", dest="syslog",
                          help="Enable logging to syslog (except debug"
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
                          constants.SYSLOG_USAGE,
                          default=constants.SYSLOG_USAGE,
                          choices=["no", "yes", "only"])
604

605
  if daemon_name in constants.DAEMONS_PORTS:
606
    default_bind_address = constants.IP4_ADDRESS_ANY
607
608
609
610
611
612
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
    # family will default to AF_INET if there is no ssconf file (e.g. when
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
    # <= 2.2 can not be AF_INET6
    if family == netutils.IP6Address.family:
      default_bind_address = constants.IP6_ADDRESS_ANY
613

614
    default_port = netutils.GetDaemonPort(daemon_name)
615
616

    # For networked daemons we allow choosing the port and bind address
617
    optionparser.add_option("-p", "--port", dest="port",
618
619
                            help="Network port (default: %s)" % default_port,
                            default=default_port, type="int")
620
    optionparser.add_option("-b", "--bind", dest="bind_address",
621
                            help=("Bind address (default: '%s')" %
622
623
                                  default_bind_address),
                            default=default_bind_address, metavar="ADDRESS")
624

625
  if default_ssl_key is not None and default_ssl_cert is not None:
626
627
628
629
    optionparser.add_option("--no-ssl", dest="ssl",
                            help="Do not secure HTTP protocol with SSL",
                            default=True, action="store_false")
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
630
631
632
633
                            help=("SSL key path (default: %s)" %
                                  default_ssl_key),
                            default=default_ssl_key, type="string",
                            metavar="SSL_KEY_PATH")
634
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
635
636
637
638
                            help=("SSL certificate path (default: %s)" %
                                  default_ssl_cert),
                            default=default_ssl_cert, type="string",
                            metavar="SSL_CERT_PATH")
639

640
  # Disable the use of fork(2) if the daemon uses threads
641
642
  if multithreaded:
    utils.DisableFork()
643
644
645

  options, args = optionparser.parse_args()

646
647
648
649
650
651
652
653
654
  if getattr(options, "ssl", False):
    ssl_paths = {
      "certificate": options.ssl_cert,
      "key": options.ssl_key,
      }

    for name, path in ssl_paths.iteritems():
      if not os.path.isfile(path):
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
655
656
        sys.exit(constants.EXIT_FAILURE)

657
658
659
660
    # TODO: By initiating http.HttpSslParams here we would only read the files
    # once and have a proper validation (isfile returns False on directories)
    # at the same time.

661
662
663
664
665
666
667
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
  if not result:
    msg = ("%s started using wrong user ID (%d), expected %d" %
           (daemon_name, running_uid, expected_uid))
    print >> sys.stderr, msg
    sys.exit(constants.EXIT_FAILURE)

668
669
670
  if check_fn is not None:
    check_fn(options, args)

671
672
  if options.fork:
    utils.CloseFDs()
673
674
    (wpipe, stdio_reopen_fn) = \
      utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
675
  else:
676
    (wpipe, stdio_reopen_fn) = (None, None)
677

678
679
680
681
682
683
684
685
686
  log_reopen_fn = \
    utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
                       debug=options.debug,
                       stderr_logging=not options.fork,
                       multithreaded=multithreaded,
                       syslog=options.syslog,
                       console_logging=console_logging)

  # Reopen log file(s) on SIGHUP
687
688
  signal.signal(signal.SIGHUP,
                compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
689

690
  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
691
  try:
692
    try:
693
      logging.info("%s daemon startup", daemon_name)
694
695
696
697
698
      if callable(prepare_fn):
        prep_results = prepare_fn(options, args)
      else:
        prep_results = None
    except Exception, err:
699
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
700
701
702
703
704
705
      raise

    if wpipe is not None:
      # we're done with the preparation phase, we close the pipe to
      # let the parent know it's safe to exit
      os.close(wpipe)
Iustin Pop's avatar
Iustin Pop committed
706
707

    exec_fn(options, args, prep_results)
708
  finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
709
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))