diff --git a/Makefile.am b/Makefile.am index 84229b5fdfb829cdaecc516cbdd7d04c14b57013..9e67492448ceed6568c94302b73efb7291f28ca9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -130,6 +130,7 @@ http_PYTHON = \ confd_PYTHON = \ lib/confd/__init__.py \ + lib/confd/client.py \ lib/confd/server.py \ lib/confd/querylib.py diff --git a/lib/confd/client.py b/lib/confd/client.py new file mode 100644 index 0000000000000000000000000000000000000000..1243bf726fd6d561ceaae96c8e820794dd2215a2 --- /dev/null +++ b/lib/confd/client.py @@ -0,0 +1,208 @@ +# +# + +# Copyright (C) 2009 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Ganeti confd client + +""" +import socket +import time +import random + +from ganeti import utils +from ganeti import constants +from ganeti import objects +from ganeti import serializer +from ganeti import daemon # contains AsyncUDPSocket +from ganeti import errors +from ganeti import confd + + +class ConfdAsyncUDPClient(daemon.AsyncUDPSocket): + """Confd udp asyncore client + + This is kept separate from the main ConfdClient to make sure it's easy to + implement a non-asyncore based client library. + + """ + def __init__(self, client): + """Constructor for ConfdAsyncUDPClient + + @type client: L{ConfdClient} + @param client: client library, to pass the datagrams to + + """ + daemon.AsyncUDPSocket.__init__(self) + self.client = client + + # this method is overriding a daemon.AsyncUDPSocket method + def handle_datagram(self, payload, ip, port): + self.client.HandleResponse(payload, ip, port) + + +class ConfdClient: + """Send queries to confd, and get back answers. + + Since the confd model works by querying multiple master candidates, and + getting back answers, this is an asynchronous library. It can either work + through asyncore or with your own handling. + + """ + def __init__(self, hmac_key, peers): + """Constructor for ConfdClient + + @type hmac_key: string + @param hmac_key: hmac key to talk to confd + @type peers: list + @param peers: list of peer nodes + + """ + if not isinstance(peers, list): + raise errors.ProgrammerError("peers must be a list") + + self._peers = peers + self._hmac_key = hmac_key + self._socket = ConfdAsyncUDPClient(self) + self._callbacks = {} + self._expire_callbacks = [] + self._confd_port = utils.GetDaemonPort(constants.CONFD) + + def _PackRequest(self, request, now=None): + """Prepare a request to be sent on the wire. + + This function puts a proper salt in a confd request, puts the proper salt, + and adds the correct magic number. + + """ + if now is None: + now = time.time() + tstamp = '%d' % now + req = serializer.DumpSignedJson(request.ToDict(), self._hmac_key, tstamp) + return confd.PackMagic(req) + + def _UnpackReply(self, payload): + in_payload = confd.UnpackMagic(payload) + (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) + return answer, salt + + def _ExpireCallbacks(self): + """Delete all the expired callbacks. + + """ + now = time.time() + while self._expire_callbacks: + expire_time, rsalt = self._expire_callbacks[0] + if now >= expire_time: + self._expire_callbacks.pop() + del self._callbacks[rsalt] + else: + break + + def SendRequest(self, request, callback, args, coverage=None): + """Send a confd request to some MCs + + @type request: L{objects.ConfdRequest} + @param request: the request to send + @type callback: f(answer, req_type, req_query, salt, ip, port, args) + @param callback: answer callback + @type args: tuple + @param args: additional callback arguments + @type coverage: integer + @keyword coverage: number of remote nodes to contact + + """ + if coverage is None: + coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) + + if not callable(callback): + raise errors.ConfdClientError("callback must be callable") + + if coverage > len(self._peers): + raise errors.ConfdClientError("Not enough MCs known to provide the" + " desired coverage") + + if not request.rsalt: + raise errors.ConfdClientError("Missing request rsalt") + + self._ExpireCallbacks() + if request.rsalt in self._callbacks: + raise errors.ConfdClientError("Duplicate request rsalt") + + if request.type not in constants.CONFD_REQS: + raise errors.ConfdClientError("Invalid request type") + + random.shuffle(self._peers) + targets = self._peers[:coverage] + + now = time.time() + payload = self._PackRequest(request, now=now) + + for target in targets: + try: + self._socket.enqueue_send(target, self._confd_port, payload) + except errors.UdpDataSizeError: + raise errors.ConfdClientError("Request too big") + + self._callbacks[request.rsalt] = (callback, request.type, + request.query, args) + expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT + self._expire_callbacks.append((expire_time, request.rsalt)) + + def HandleResponse(self, payload, ip, port): + """Asynchronous handler for a confd reply + + Call the relevant callback associated to the current request. + + """ + try: + try: + answer, salt = self._UnpackReply(payload) + except (errors.SignatureError, errors.ConfdMagicError): + return + + try: + (callback, type, query, args) = self._callbacks[salt] + except KeyError: + # If the salt is unkown the answer is probably a replay of an old + # expired query. Ignoring it. + pass + else: + callback(answer, type, query, salt, ip, port, args) + + finally: + self._ExpireCallbacks() + + +class ConfdClientRequest(objects.ConfdRequest): + """This is the client-side version of ConfdRequest. + + This version of the class helps creating requests, on the client side, by + filling in some default values. + + """ + def __init__(self, **kwargs): + objects.ConfdRequest.__init__(self, **kwargs) + if not self.rsalt: + self.rsalt = utils.NewUUID() + if not self.protocol: + self.protocol = constants.CONFD_PROTOCOL_VERSION + if self.type not in constants.CONFD_REQS: + raise errors.ConfdClientError("Invalid request type") + diff --git a/lib/constants.py b/lib/constants.py index f233f47d2bd56339f1afe3eead543de6c048b5ce..ee03bb25b5217dc5810a842540e02ea616f7e7a0 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -688,6 +688,16 @@ CONFD_CONFIG_RELOAD_RATELIMIT = 2 # compressed, or move away from json. CONFD_MAGIC_FOURCC = 'plj0' +# By default a confd request is sent to the minimum between this number and all +# MCs. 6 was chosen because even in the case of a disastrous 50% response rate, +# we should have enough answers to be able to compare more than one. +CONFD_DEFAULT_REQ_COVERAGE = 6 + +# Timeout in seconds to expire pending query request in the confd client +# library. We don't actually expect any answer more than 10 seconds after we +# sent a request. +CONFD_CLIENT_EXPIRE_TIMEOUT = 10 + # Maximum UDP datagram size. # On IPv4: 64K - 20 (ip header size) - 8 (udp header size) = 65507 # On IPv6: 64K - 40 (ip6 header size) - 8 (udp header size) = 65487 diff --git a/lib/errors.py b/lib/errors.py index 9136c5eff0d4e15f5fe7f8210c7200988de1ebd6..9fd9868e665dbdd957ddd009723a6d670ba99f17 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -305,6 +305,14 @@ class ConfdMagicError(GenericError): """ +class ConfdClientError(GenericError): + """A magic fourcc error in Ganeti confd. + + Errors in the confd client library. + + """ + + class UdpDataSizeError(GenericError): """UDP payload too big.