daemon.py 23.5 KB
Newer Older
1
2
3
#
#

4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.


"""Module with helper classes and functions for daemons"""


25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
32
import sched
import time
33
import socket
34
import select
35
import sys
36
37

from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43
from ganeti import compat
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78


class SchedulerBreakout(Exception):
  """Exception used to get out of the scheduler loop

  """


def AsyncoreDelayFunction(timeout):
  """Asyncore-compatible scheduler delay function.

  This is a delay function for sched that, rather than actually sleeping,
  executes asyncore events happening in the meantime.

  After an event has occurred, rather than returning, it raises a
  SchedulerBreakout exception, which will force the current scheduler.run()
  invocation to terminate, so that we can also check for signals. The main loop
  will then call the scheduler run again, which will allow it to actually
  process any due events.

  This is needed because scheduler.run() doesn't support a count=..., as
  asyncore loop, and the scheduler module documents throwing exceptions from
  inside the delay function as an allowed usage model.

  """
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
  raise SchedulerBreakout()


class AsyncoreScheduler(sched.scheduler):
  """Event scheduler integrated with asyncore

  """
  def __init__(self, timefunc):
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
79
80


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
  """Base Ganeti Asyncore Dispacher

  """
  # this method is overriding an asyncore.dispatcher method
  def handle_error(self):
    """Log an error in handling any request, and proceed.

    """
    logging.exception("Error while handling asyncore request")

  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    """Most of the time we don't want to check for writability.

    """
    return False


Guido Trotter's avatar
Guido Trotter committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
  """A stream server to use with asyncore.

  Each request is accepted, and then dispatched to a separate asyncore
  dispatcher to handle.

  """

  _REQUEST_QUEUE_SIZE = 5

  def __init__(self, family, address):
    """Constructor for AsyncUnixStreamSocket

    @type family: integer
    @param family: socket family (one of socket.AF_*)
    @type address: address family dependent
    @param address: address to bind the socket to

    """
    GanetiBaseAsyncoreDispatcher.__init__(self)
    self.family = family
    self.create_socket(self.family, socket.SOCK_STREAM)
    self.set_reuse_addr()
    self.bind(address)
    self.listen(self._REQUEST_QUEUE_SIZE)

  # this method is overriding an asyncore.dispatcher method
  def handle_accept(self):
    """Accept a new client connection.

    Creates a new instance of the handler class, which will use asyncore to
    serve the client.

    """
    accept_result = utils.IgnoreSignals(self.accept)
    if accept_result is not None:
      connected_socket, client_address = accept_result
      if self.family == socket.AF_UNIX:
        # override the client address, as for unix sockets nothing meaningful
        # is passed in from accept anyway
140
        client_address = netutils.GetSocketCredentials(connected_socket)
Guido Trotter's avatar
Guido Trotter committed
141
      logging.info("Accepted connection from %s",
142
                   netutils.FormatAddress(client_address, family=self.family))
Guido Trotter's avatar
Guido Trotter committed
143
144
145
146
147
148
149
150
151
      self.handle_connection(connected_socket, client_address)

  def handle_connection(self, connected_socket, client_address):
    """Handle an already accepted connection.

    """
    raise NotImplementedError


152
153
154
155
156
157
158
class AsyncTerminatedMessageStream(asynchat.async_chat):
  """A terminator separated message stream asyncore module.

  Handles a stream connection receiving messages terminated by a defined
  separator. For each complete message handle_message is called.

  """
159
160
  def __init__(self, connected_socket, peer_address, terminator, family,
               unhandled_limit):
161
162
163
164
165
166
167
168
169
    """AsyncTerminatedMessageStream constructor.

    @type connected_socket: socket.socket
    @param connected_socket: connected stream socket to receive messages from
    @param peer_address: family-specific peer address
    @type terminator: string
    @param terminator: terminator separating messages in the stream
    @type family: integer
    @param family: socket family
170
171
    @type unhandled_limit: integer or None
    @param unhandled_limit: maximum unanswered messages
172
173
174
175
176
177
178
179
180
181
182
183

    """
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
    # using a positional argument rather than a keyword one.
    asynchat.async_chat.__init__(self, connected_socket)
    self.connected_socket = connected_socket
    # on python 2.4 there is no "family" attribute for the socket class
    # FIXME: when we move to python 2.5 or above remove the family parameter
    #self.family = self.connected_socket.family
    self.family = family
    self.peer_address = peer_address
    self.terminator = terminator
184
    self.unhandled_limit = unhandled_limit
185
186
    self.set_terminator(terminator)
    self.ibuffer = []
187
188
    self.receive_count = 0
    self.send_count = 0
189
    self.oqueue = collections.deque()
190
    self.iqueue = collections.deque()
191
192
193
194
195

  # this method is overriding an asynchat.async_chat method
  def collect_incoming_data(self, data):
    self.ibuffer.append(data)

196
197
198
199
200
  def _can_handle_message(self):
    return (self.unhandled_limit is None or
            (self.receive_count < self.send_count + self.unhandled_limit) and
             not self.iqueue)

201
202
203
204
  # this method is overriding an asynchat.async_chat method
  def found_terminator(self):
    message = "".join(self.ibuffer)
    self.ibuffer = []
205
206
207
208
209
210
211
212
213
    message_id = self.receive_count
    # We need to increase the receive_count after checking if the message can
    # be handled, but before calling handle_message
    can_handle = self._can_handle_message()
    self.receive_count += 1
    if can_handle:
      self.handle_message(message, message_id)
    else:
      self.iqueue.append((message, message_id))
214
215
216
217
218
219
220
221
222
223
224
225
226
227

  def handle_message(self, message, message_id):
    """Handle a terminated message.

    @type message: string
    @param message: message to handle
    @type message_id: integer
    @param message_id: stream's message sequence number

    """
    pass
    # TODO: move this method to raise NotImplementedError
    # raise NotImplementedError

228
229
230
231
232
233
234
235
236
237
238
239
240
  def send_message(self, message):
    """Send a message to the remote peer. This function is thread-safe.

    @type message: string
    @param message: message to send, without the terminator

    @warning: If calling this function from a thread different than the one
    performing the main asyncore loop, remember that you have to wake that one
    up.

    """
    # If we just append the message we received to the output queue, this
    # function can be safely called by multiple threads at the same time, and
241
242
243
    # we don't need locking, since deques are thread safe. handle_write in the
    # asyncore thread will handle the next input message if there are any
    # enqueued.
244
245
    self.oqueue.append(message)

246
247
248
249
250
  # this method is overriding an asyncore.dispatcher method
  def readable(self):
    # read from the socket if we can handle the next requests
    return self._can_handle_message() and asynchat.async_chat.readable(self)

251
252
253
254
255
256
257
258
259
260
  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    # the output queue may become full just after we called writable. This only
    # works if we know we'll have something else waking us up from the select,
    # in such case, anyway.
    return asynchat.async_chat.writable(self) or self.oqueue

  # this method is overriding an asyncore.dispatcher method
  def handle_write(self):
    if self.oqueue:
261
262
263
      # if we have data in the output queue, then send_message was called.
      # this means we can process one more message from the input queue, if
      # there are any.
264
265
      data = self.oqueue.popleft()
      self.push(data + self.terminator)
266
267
268
      self.send_count += 1
      if self.iqueue:
        self.handle_message(*self.iqueue.popleft())
269
270
    self.initiate_send()

271
272
  def close_log(self):
    logging.info("Closing connection from %s",
273
                 netutils.FormatAddress(self.peer_address, family=self.family))
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
    self.close()

  # this method is overriding an asyncore.dispatcher method
  def handle_expt(self):
    self.close_log()

  # this method is overriding an asyncore.dispatcher method
  def handle_error(self):
    """Log an error in handling any request, and proceed.

    """
    logging.exception("Error while handling asyncore request")
    self.close_log()


289
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290
291
292
  """An improved asyncore udp socket.

  """
Manuel Franceschini's avatar
Manuel Franceschini committed
293
  def __init__(self, family):
294
295
296
    """Constructor for AsyncUDPSocket

    """
297
    GanetiBaseAsyncoreDispatcher.__init__(self)
298
    self._out_queue = []
Manuel Franceschini's avatar
Manuel Franceschini committed
299
300
    self._family = family
    self.create_socket(family, socket.SOCK_DGRAM)
301
302
303
304
305
306
307
308
309
310

  # this method is overriding an asyncore.dispatcher method
  def handle_connect(self):
    # Python thinks that the first udp message from a source qualifies as a
    # "connect" and further ones are part of the same connection. We beg to
    # differ and treat all messages equally.
    pass

  # this method is overriding an asyncore.dispatcher method
  def handle_read(self):
311
312
313
314
    recv_result = utils.IgnoreSignals(self.recvfrom,
                                      constants.MAX_UDP_DATA_SIZE)
    if recv_result is not None:
      payload, address = recv_result
Manuel Franceschini's avatar
Manuel Franceschini committed
315
316
317
318
319
320
      if self._family == socket.AF_INET6:
        # we ignore 'flow info' and 'scope id' as we don't need them
        ip, port, _, _ = address
      else:
        ip, port = address

321
      self.handle_datagram(payload, ip, port)
322
323
324
325
326
327
328
329
330
331
332
333
334

  def handle_datagram(self, payload, ip, port):
    """Handle an already read udp datagram

    """
    raise NotImplementedError

  # this method is overriding an asyncore.dispatcher method
  def writable(self):
    # We should check whether we can write to the socket only if we have
    # something scheduled to be written
    return bool(self._out_queue)

335
  # this method is overriding an asyncore.dispatcher method
336
  def handle_write(self):
Guido Trotter's avatar
Guido Trotter committed
337
338
339
340
    if not self._out_queue:
      logging.error("handle_write called with empty output queue")
      return
    (ip, port, payload) = self._out_queue[0]
Guido Trotter's avatar
Guido Trotter committed
341
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
Guido Trotter's avatar
Guido Trotter committed
342
343
    self._out_queue.pop(0)

344
345
346
347
  def enqueue_send(self, ip, port, payload):
    """Enqueue a datagram to be sent when possible

    """
348
349
350
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
                                    constants.MAX_UDP_DATA_SIZE))
351
352
    self._out_queue.append((ip, port, payload))

353
354
355
356
357
358
359
360
361
  def process_next_packet(self, timeout=0):
    """Process the next datagram, waiting for it if necessary.

    @type timeout: float
    @param timeout: how long to wait for data
    @rtype: boolean
    @return: True if some data has been handled, False otherwise

    """
362
363
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
    if result is not None and result & select.POLLIN:
Guido Trotter's avatar
Guido Trotter committed
364
      self.handle_read()
365
366
367
368
      return True
    else:
      return False

369

Guido Trotter's avatar
Guido Trotter committed
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
  """A way to notify the asyncore loop that something is going on.

  If an asyncore daemon is multithreaded when a thread tries to push some data
  to a socket, the main loop handling asynchronous requests might be sleeping
  waiting on a select(). To avoid this it can create an instance of the
  AsyncAwaker, which other threads can use to wake it up.

  """
  def __init__(self, signal_fn=None):
    """Constructor for AsyncAwaker

    @type signal_fn: function
    @param signal_fn: function to call when awaken

    """
    GanetiBaseAsyncoreDispatcher.__init__(self)
    assert signal_fn == None or callable(signal_fn)
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
                                                          socket.SOCK_STREAM)
    self.in_socket.setblocking(0)
391
392
    self.in_socket.shutdown(socket.SHUT_WR)
    self.out_socket.shutdown(socket.SHUT_RD)
Guido Trotter's avatar
Guido Trotter committed
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
    self.set_socket(self.in_socket)
    self.need_signal = True
    self.signal_fn = signal_fn
    self.connected = True

  # this method is overriding an asyncore.dispatcher method
  def handle_read(self):
    utils.IgnoreSignals(self.recv, 4096)
    if self.signal_fn:
      self.signal_fn()
    self.need_signal = True

  # this method is overriding an asyncore.dispatcher method
  def close(self):
    asyncore.dispatcher.close(self)
    self.out_socket.close()

  def signal(self):
    """Signal the asyncore main loop.

    Any data we send here will be ignored, but it will cause the select() call
    to return.

    """
    # Yes, there is a race condition here. No, we don't care, at worst we're
    # sending more than one wakeup token, which doesn't harm at all.
    if self.need_signal:
      self.need_signal = False
      self.out_socket.send("\0")


424
425
426
class Mainloop(object):
  """Generic mainloop for daemons

427
428
429
  @ivar scheduler: A sched.scheduler object, which can be used to register
    timed events

430
431
  """
  def __init__(self):
432
433
434
    """Constructs a new Mainloop instance.

    """
435
    self._signal_wait = []
436
    self.scheduler = AsyncoreScheduler(time.time)
437

438
439
  @utils.SignalHandled([signal.SIGCHLD])
  @utils.SignalHandled([signal.SIGTERM])
440
  @utils.SignalHandled([signal.SIGINT])
441
  def Run(self, signal_handlers=None):
442
443
    """Runs the mainloop.

444
445
    @type signal_handlers: dict
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
446
447

    """
448
449
450
451
452
453
    assert isinstance(signal_handlers, dict) and \
           len(signal_handlers) > 0, \
           "Broken SignalHandled decorator"
    running = True
    # Start actual main loop
    while running:
454
455
456
457
458
459
460
      if not self.scheduler.empty():
        try:
          self.scheduler.run()
        except SchedulerBreakout:
          pass
      else:
        asyncore.loop(count=1, use_poll=True)
461
462
463
464
465
466

      # Check whether a signal was raised
      for sig in signal_handlers:
        handler = signal_handlers[sig]
        if handler.called:
          self._CallSignalWaiters(sig)
467
          running = sig not in (signal.SIGTERM, signal.SIGINT)
468
          handler.Clear()
Guido Trotter's avatar
Guido Trotter committed
469

470
471
472
473
474
475
476
477
  def _CallSignalWaiters(self, signum):
    """Calls all signal waiters for a certain signal.

    @type signum: int
    @param signum: Signal number

    """
    for owner in self._signal_wait:
478
      owner.OnSignal(signum)
479
480
481
482
483
484
485
486
487
488
489

  def RegisterSignal(self, owner):
    """Registers a receiver for signal notifications

    The receiver must support a "OnSignal(self, signum)" function.

    @type owner: instance
    @param owner: Receiver

    """
    self._signal_wait.append(owner)
490

491

492
493
494
def _VerifyDaemonUser(daemon_name):
  """Verifies the process uid matches the configured uid.

Iustin Pop's avatar
Iustin Pop committed
495
496
  This method verifies that a daemon is started as the user it is
  intended to be run
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515

  @param daemon_name: The name of daemon to be started
  @return: A tuple with the first item indicating success or not,
           the second item current uid and third with expected uid

  """
  getents = runtime.GetEnts()
  running_uid = os.getuid()
  daemon_uids = {
    constants.MASTERD: getents.masterd_uid,
    constants.RAPI: getents.rapi_uid,
    constants.NODED: getents.noded_uid,
    constants.CONFD: getents.confd_uid,
    }

  return (daemon_uids[daemon_name] == running_uid, running_uid,
          daemon_uids[daemon_name])


Iustin Pop's avatar
Iustin Pop committed
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def _BeautifyError(err):
  """Try to format an error better.

  Since we're dealing with daemon startup errors, in many cases this
  will be due to socket error and such, so we try to format these cases better.

  @param err: an exception object
  @rtype: string
  @return: the formatted error description

  """
  try:
    if isinstance(err, socket.error):
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
    elif isinstance(err, EnvironmentError):
      if err.filename is None:
        return "%s (errno=%s)" % (err.strerror, err.errno)
      else:
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
                                            err.errno)
    else:
      return str(err)
  except Exception: # pylint: disable-msg=W0703
    logging.exception("Error while handling existing error %s", err)
    return "%s" % str(err)


543
544
545
546
547
548
549
550
551
552
553
def _HandleSigHup(reopen_cb, signum, frame): # pylint: disable-msg=W0613
  """Handler for SIGHUP.

  @param reopen_cb: Callback function for reopening log files

  """
  assert callable(reopen_cb)
  logging.info("Reopening log files after receiving SIGHUP")
  reopen_cb()


Iustin Pop's avatar
Iustin Pop committed
554
555
def GenericMain(daemon_name, optionparser,
                check_fn, prepare_fn, exec_fn,
Luca Bigliardi's avatar
Luca Bigliardi committed
556
                multithreaded=False, console_logging=False,
557
                default_ssl_cert=None, default_ssl_key=None):
558
559
560
561
  """Shared main function for daemons.

  @type daemon_name: string
  @param daemon_name: daemon name
562
  @type optionparser: optparse.OptionParser
563
564
565
566
567
  @param optionparser: initialized optionparser with daemon-specific options
                       (common -f -d options will be handled by this module)
  @type check_fn: function which accepts (options, args)
  @param check_fn: function that checks start conditions and exits if they're
                   not met
Iustin Pop's avatar
Iustin Pop committed
568
569
570
571
572
  @type prepare_fn: function which accepts (options, args)
  @param prepare_fn: function that is run before forking, or None;
      it's result will be passed as the third parameter to exec_fn, or
      if None was passed in, we will just pass None to exec_fn
  @type exec_fn: function which accepts (options, args, prepare_results)
573
574
  @param exec_fn: function that's executed with the daemon's pid file held, and
                  runs the daemon itself.
575
576
  @type multithreaded: bool
  @param multithreaded: Whether the daemon uses threads
577
578
579
  @type console_logging: boolean
  @param console_logging: if True, the daemon will fall back to the system
                          console if logging fails
580
581
582
583
  @type default_ssl_cert: string
  @param default_ssl_cert: Default SSL certificate path
  @type default_ssl_key: string
  @param default_ssl_key: Default SSL key path
584
585
586
587
588
589
590
591

  """
  optionparser.add_option("-f", "--foreground", dest="fork",
                          help="Don't detach from the current terminal",
                          default=True, action="store_false")
  optionparser.add_option("-d", "--debug", dest="debug",
                          help="Enable some debug messages",
                          default=False, action="store_true")
592
593
594
595
596
597
  optionparser.add_option("--syslog", dest="syslog",
                          help="Enable logging to syslog (except debug"
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
                          constants.SYSLOG_USAGE,
                          default=constants.SYSLOG_USAGE,
                          choices=["no", "yes", "only"])
598

599
  if daemon_name in constants.DAEMONS_PORTS:
600
    default_bind_address = constants.IP4_ADDRESS_ANY
601
602
603
604
605
606
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
    # family will default to AF_INET if there is no ssconf file (e.g. when
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
    # <= 2.2 can not be AF_INET6
    if family == netutils.IP6Address.family:
      default_bind_address = constants.IP6_ADDRESS_ANY
607

608
    default_port = netutils.GetDaemonPort(daemon_name)
609
610

    # For networked daemons we allow choosing the port and bind address
611
    optionparser.add_option("-p", "--port", dest="port",
612
613
                            help="Network port (default: %s)" % default_port,
                            default=default_port, type="int")
614
    optionparser.add_option("-b", "--bind", dest="bind_address",
615
                            help=("Bind address (default: '%s')" %
616
617
                                  default_bind_address),
                            default=default_bind_address, metavar="ADDRESS")
618

619
  if default_ssl_key is not None and default_ssl_cert is not None:
620
621
622
623
    optionparser.add_option("--no-ssl", dest="ssl",
                            help="Do not secure HTTP protocol with SSL",
                            default=True, action="store_false")
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
624
625
626
627
                            help=("SSL key path (default: %s)" %
                                  default_ssl_key),
                            default=default_ssl_key, type="string",
                            metavar="SSL_KEY_PATH")
628
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
629
630
631
632
                            help=("SSL certificate path (default: %s)" %
                                  default_ssl_cert),
                            default=default_ssl_cert, type="string",
                            metavar="SSL_CERT_PATH")
633

634
  # Disable the use of fork(2) if the daemon uses threads
635
636
  if multithreaded:
    utils.DisableFork()
637
638
639

  options, args = optionparser.parse_args()

640
641
642
643
644
645
646
647
648
  if getattr(options, "ssl", False):
    ssl_paths = {
      "certificate": options.ssl_cert,
      "key": options.ssl_key,
      }

    for name, path in ssl_paths.iteritems():
      if not os.path.isfile(path):
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
649
650
        sys.exit(constants.EXIT_FAILURE)

651
652
653
654
    # TODO: By initiating http.HttpSslParams here we would only read the files
    # once and have a proper validation (isfile returns False on directories)
    # at the same time.

655
656
657
658
659
660
661
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
  if not result:
    msg = ("%s started using wrong user ID (%d), expected %d" %
           (daemon_name, running_uid, expected_uid))
    print >> sys.stderr, msg
    sys.exit(constants.EXIT_FAILURE)

662
663
664
  if check_fn is not None:
    check_fn(options, args)

665
666
  if options.fork:
    utils.CloseFDs()
667
668
669
    wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
  else:
    wpipe = None
670

671
672
673
674
675
676
677
678
679
680
681
  log_reopen_fn = \
    utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
                       debug=options.debug,
                       stderr_logging=not options.fork,
                       multithreaded=multithreaded,
                       syslog=options.syslog,
                       console_logging=console_logging)

  # Reopen log file(s) on SIGHUP
  signal.signal(signal.SIGHUP, compat.partial(_HandleSigHup, log_reopen_fn))

682
  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
683
  try:
684
685
686
687
688
689
690
    try:
      if callable(prepare_fn):
        prep_results = prepare_fn(options, args)
      else:
        prep_results = None
      logging.info("%s daemon startup", daemon_name)
    except Exception, err:
691
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
692
693
694
695
696
697
      raise

    if wpipe is not None:
      # we're done with the preparation phase, we close the pipe to
      # let the parent know it's safe to exit
      os.close(wpipe)
Iustin Pop's avatar
Iustin Pop committed
698
699

    exec_fn(options, args, prep_results)
700
  finally:
Michael Hanselmann's avatar
Michael Hanselmann committed
701
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))