diff --git a/lib/confd/client.py b/lib/confd/client.py index dba30ee0becc57a98c87c83fbcba0376c464a906..efcb68d3361272fc8d0e88f81d0d8f88f84b0336 100644 --- a/lib/confd/client.py +++ b/lib/confd/client.py @@ -102,9 +102,9 @@ class ConfdClient: @type callback: f(L{ConfdUpcallPayload}) @param callback: function to call when getting answers @type port: integer - @keyword port: confd port (default: use GetDaemonPort) + @param port: confd port (default: use GetDaemonPort) @type logger: logging.Logger - @keyword logger: optional logger for internal conditions + @param logger: optional logger for internal conditions """ if not callable(callback): @@ -176,15 +176,17 @@ class ConfdClient: else: break - def SendRequest(self, request, args=None, coverage=None): + def SendRequest(self, request, args=None, coverage=None, async=True): """Send a confd request to some MCs @type request: L{objects.ConfdRequest} @param request: the request to send @type args: tuple - @keyword args: additional callback arguments + @param args: additional callback arguments @type coverage: integer - @keyword coverage: number of remote nodes to contact + @param coverage: number of remote nodes to contact + @type async: boolean + @param async: handle the write asynchronously """ if coverage is None: @@ -220,6 +222,9 @@ class ConfdClient: expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT self._expire_requests.append((expire_time, request.rsalt)) + if not async: + self.FlushSendQueue() + def HandleResponse(self, payload, ip, port): """Asynchronous handler for a confd reply @@ -255,6 +260,26 @@ class ConfdClient: finally: self.ExpireRequests() + def FlushSendQueue(self): + """Send out all pending requests. + + Can be used for synchronous client use. + + """ + while self._socket.writable(): + self._socket.handle_write() + + def ReceiveReply(self, timeout=1): + """Receive one reply. + + @type timeout: float + @param timeout: how long to wait for the reply + @rtype: boolean + @return: True if some data has been handled, False otherwise + + """ + return self._socket.process_next_packet(timeout=timeout) + # UPCALL_REPLY: server reply upcall # has all ConfdUpcallPayload fields populated @@ -328,7 +353,7 @@ class ConfdFilterCallback: @type callback: f(L{ConfdUpcallPayload}) @param callback: function to call when getting answers @type logger: logging.Logger - @keyword logger: optional logger for internal conditions + @param logger: optional logger for internal conditions """ if not callable(callback): @@ -407,3 +432,65 @@ class ConfdFilterCallback: if not filter_upcall: self._callback(up) + + +class ConfdCountingCallback: + """Callback that calls another callback, and counts the answers + + """ + def __init__(self, callback, logger=None): + """Constructor for ConfdCountingCallback + + @type callback: f(L{ConfdUpcallPayload}) + @param callback: function to call when getting answers + @type logger: logging.Logger + @param logger: optional logger for internal conditions + + """ + if not callable(callback): + raise errors.ProgrammerError("callback must be callable") + + self._callback = callback + self._logger = logger + # answers contains a dict of salt -> count + self._answers = {} + + def RegisterQuery(self, salt): + if salt in self._answers: + raise errors.ProgrammerError("query already registered") + self._answers[salt] = 0 + + def AllAnswered(self): + """Have all the registered queries received at least an answer? + + """ + return utils.all(self._answers.values()) + + def _HandleExpire(self, up): + # if we have no answer we have received none, before the expiration. + if up.salt in self._answers: + del self._answers[up.salt] + + def _HandleReply(self, up): + """Handle a single confd reply, and decide whether to filter it. + + @rtype: boolean + @return: True if the reply should be filtered, False if it should be passed + on to the up-callback + + """ + if up.salt in self._answers: + self._answers[up.salt] += 1 + + def __call__(self, up): + """Filtering callback + + @type up: L{ConfdUpcallPayload} + @param up: upper callback + + """ + if up.type == UPCALL_REPLY: + self._HandleReply(up) + elif up.type == UPCALL_EXPIRE: + self._HandleExpire(up) + self._callback(up) diff --git a/lib/daemon.py b/lib/daemon.py index 1bc9bdc70046727617bb65618bc5c15f6aebcfae..5283fc515121cd82755db83a77c47a346c98cbad 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -30,6 +30,7 @@ import logging import sched import time import socket +import select import sys from ganeti import utils @@ -91,20 +92,23 @@ class AsyncUDPSocket(asyncore.dispatcher): # differ and treat all messages equally. pass + def do_read(self): + try: + payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE) + except socket.error, err: + if err.errno == errno.EINTR: + # we got a signal while trying to read. no need to do anything, + # handle_read will be called again if there is data on the socket. + return + else: + raise + ip, port = address + self.handle_datagram(payload, ip, port) + # this method is overriding an asyncore.dispatcher method def handle_read(self): try: - try: - payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE) - except socket.error, err: - if err.errno == errno.EINTR: - # we got a signal while trying to read. no need to do anything, - # handle_read will be called again if there is data on the socket. - return - else: - raise - ip, port = address - self.handle_datagram(payload, ip, port) + self.do_read() except: # pylint: disable-msg=W0702 # we need to catch any exception here, log it, but proceed, because even # if we failed handling a single request, we still want to continue. @@ -153,6 +157,21 @@ class AsyncUDPSocket(asyncore.dispatcher): constants.MAX_UDP_DATA_SIZE)) self._out_queue.append((ip, port, payload)) + def process_next_packet(self, timeout=0): + """Process the next datagram, waiting for it if necessary. + + @type timeout: float + @param timeout: how long to wait for data + @rtype: boolean + @return: True if some data has been handled, False otherwise + + """ + if utils.WaitForFdCondition(self, select.POLLIN, timeout) & select.POLLIN: + self.do_read() + return True + else: + return False + class Mainloop(object): """Generic mainloop for daemons diff --git a/lib/http/__init__.py b/lib/http/__init__.py index 965a297a24e0eb9eea30d63b0040cf27c23dcd47..2fc9cd20483b568924640e6b602dbdbfc75ee2e3 100644 --- a/lib/http/__init__.py +++ b/lib/http/__init__.py @@ -323,44 +323,6 @@ class HttpVersionNotSupported(HttpException): code = 505 -def WaitForSocketCondition(sock, event, timeout): - """Waits for a condition to occur on the socket. - - @type sock: socket - @param sock: Wait for events on this socket - @type event: int - @param event: ORed condition (see select module) - @type timeout: float or None - @param timeout: Timeout in seconds - @rtype: int or None - @return: None for timeout, otherwise occured conditions - - """ - check = (event | select.POLLPRI | - select.POLLNVAL | select.POLLHUP | select.POLLERR) - - if timeout is not None: - # Poller object expects milliseconds - timeout *= 1000 - - poller = select.poll() - poller.register(sock, event) - try: - while True: - # TODO: If the main thread receives a signal and we have no timeout, we - # could wait forever. This should check a global "quit" flag or - # something every so often. - io_events = poller.poll(timeout) - if not io_events: - # Timeout - return None - for (_, evcond) in io_events: - if evcond & check: - return evcond - finally: - poller.unregister(sock) - - def SocketOperation(sock, op, arg1, timeout): """Wrapper around socket functions. @@ -411,7 +373,7 @@ def SocketOperation(sock, op, arg1, timeout): else: wait_for_event = event_poll - event = WaitForSocketCondition(sock, wait_for_event, timeout) + event = utils.WaitForFdCondition(sock, wait_for_event, timeout) if event is None: raise HttpSocketTimeout() diff --git a/lib/http/client.py b/lib/http/client.py index ba27d1a5c107471549734b09e372bb98ba0a4621..655a209b20c62acedce53355741aa4ecd62782ad 100644 --- a/lib/http/client.py +++ b/lib/http/client.py @@ -37,6 +37,7 @@ import threading from ganeti import workerpool from ganeti import http +from ganeti import utils HTTP_CLIENT_THREADS = 10 @@ -249,8 +250,8 @@ class HttpClientRequestExecutor(http.HttpBase): if not connected: # Wait for connection - event = http.WaitForSocketCondition(self.sock, select.POLLOUT, - self.CONNECT_TIMEOUT) + event = utils.WaitForFdCondition(self.sock, select.POLLOUT, + self.CONNECT_TIMEOUT) if event is None: raise http.HttpError("Timeout while connecting to server") diff --git a/lib/utils.py b/lib/utils.py index cb7f39a155975cecdb43eeff4d6f5d7febcab274..cab0ffaa5d27c0085019510eb6361659d02b370b 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1740,18 +1740,112 @@ def FirstFree(seq, base=0): return None -def all(seq, pred=bool): # pylint: disable-msg=W0622 - "Returns True if pred(x) is True for every element in the iterable" - for _ in itertools.ifilterfalse(pred, seq): +try: + all = all # pylint: disable-msg=W0622 +except NameError: + def all(seq, pred=bool): # pylint: disable-msg=W0622 + "Returns True if pred(x) is True for every element in the iterable" + for _ in itertools.ifilterfalse(pred, seq): + return False + return True + + +try: + any = any # pylint: disable-msg=W0622 +except NameError: + def any(seq, pred=bool): # pylint: disable-msg=W0622 + "Returns True if pred(x) is True for at least one element in the iterable" + for _ in itertools.ifilter(pred, seq): + return True return False - return True -def any(seq, pred=bool): # pylint: disable-msg=W0622 - "Returns True if pred(x) is True for at least one element in the iterable" - for _ in itertools.ifilter(pred, seq): - return True - return False +def SingleWaitForFdCondition(fdobj, event, timeout): + """Waits for a condition to occur on the socket. + + Immediately returns at the first interruption. + + @type fdobj: integer or object supporting a fileno() method + @param fdobj: entity to wait for events on + @type event: integer + @param event: ORed condition (see select module) + @type timeout: float or None + @param timeout: Timeout in seconds + @rtype: int or None + @return: None for timeout, otherwise occured conditions + + """ + check = (event | select.POLLPRI | + select.POLLNVAL | select.POLLHUP | select.POLLERR) + + if timeout is not None: + # Poller object expects milliseconds + timeout *= 1000 + + poller = select.poll() + poller.register(fdobj, event) + try: + # TODO: If the main thread receives a signal and we have no timeout, we + # could wait forever. This should check a global "quit" flag or something + # every so often. + io_events = poller.poll(timeout) + except select.error, err: + if err[0] != errno.EINTR: + raise + io_events = [] + if io_events and io_events[0][1] & check: + return io_events[0][1] + else: + return None + + +class FdConditionWaiterHelper(object): + """Retry helper for WaitForFdCondition. + + This class contains the retried and wait functions that make sure + WaitForFdCondition can continue waiting until the timeout is actually + expired. + + """ + + def __init__(self, timeout): + self.timeout = timeout + + def Poll(self, fdobj, event): + result = SingleWaitForFdCondition(fdobj, event, self.timeout) + if result is None: + raise RetryAgain() + else: + return result + + def UpdateTimeout(self, timeout): + self.timeout = timeout + + +def WaitForFdCondition(fdobj, event, timeout): + """Waits for a condition to occur on the socket. + + Retries until the timeout is expired, even if interrupted. + + @type fdobj: integer or object supporting a fileno() method + @param fdobj: entity to wait for events on + @type event: integer + @param event: ORed condition (see select module) + @type timeout: float or None + @param timeout: Timeout in seconds + @rtype: int or None + @return: None for timeout, otherwise occured conditions + + """ + if timeout is not None: + retrywaiter = FdConditionWaiterHelper(timeout) + result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout, + args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout) + else: + result = None + while result is None: + result = SingleWaitForFdCondition(fdobj, event, timeout) + return result def partition(seq, pred=bool): # # pylint: disable-msg=W0622 diff --git a/tools/burnin b/tools/burnin index 8d3c14c30abbe94b11bb94c4f1238b5b55696683..91aae6b5c06d5bbefb2a0b2f57c2470a4423ea5b 100755 --- a/tools/burnin +++ b/tools/burnin @@ -36,6 +36,9 @@ from ganeti import constants from ganeti import cli from ganeti import errors from ganeti import utils +from ganeti import ssconf + +from ganeti.confd import client as confd_client USAGE = ("\tburnin -o OS_NAME [options...] instance_name ...") @@ -164,6 +167,9 @@ OPTIONS = [ cli.cli_option("--no-nics", dest="nics", help="No network interfaces", action="store_const", const=[], default=[{}]), + cli.cli_option("--no-confd", dest="do_confd_tests", + help="Skip confd queries", + action="store_false", default=True), cli.cli_option("--rename", dest="rename", default=None, help=("Give one unused instance name which is taken" " to start the renaming sequence"), @@ -260,6 +266,7 @@ class Burner(object): self.hvp = self.bep = None self.ParseOptions() self.cl = cli.GetClient() + self.ss = ssconf.SimpleStore() self.GetState() def ClearFeedbackBuf(self): @@ -456,7 +463,7 @@ class Burner(object): socket.setdefaulttimeout(options.net_timeout) def GetState(self): - """Read the cluster state from the config.""" + """Read the cluster state from the master daemon.""" if self.opts.nodes: names = self.opts.nodes.split(",") else: @@ -486,6 +493,14 @@ class Burner(object): if not found: Err("OS '%s' not found" % self.opts.os) + cluster_info = self.cl.QueryClusterInfo() + self.cluster_info = cluster_info + if not self.cluster_info: + Err("Can't get cluster info") + + default_nic_params = self.cluster_info["nicparams"][constants.PP_DEFAULT] + self.cluster_default_nicparams = default_nic_params + @_DoCheckInstances @_DoBatch(False) def BurnCreateInstances(self): @@ -831,6 +846,67 @@ class Burner(object): Log("removing last NIC", indent=2) self.ExecOrQueue(instance, op_add, op_rem) + def ConfdCallback(self, reply): + """Callback for confd queries""" + if reply.type == confd_client.UPCALL_REPLY: + if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK: + Err("Query %s gave non-ok status %s: %s" % (reply.orig_request, + reply.server_reply.status, + reply.server_reply)) + if reply.orig_request.type == constants.CONFD_REQ_PING: + Log("Ping: OK", indent=1) + elif reply.orig_request.type == constants.CONFD_REQ_CLUSTER_MASTER: + if reply.server_reply.answer == self.cluster_info["master"]: + Log("Master: OK", indent=1) + else: + Err("Master: wrong: %s" % reply.server_reply.answer) + elif reply.orig_request.type == constants.CONFD_REQ_NODE_ROLE_BYNAME: + if reply.server_reply.answer == constants.CONFD_NODE_ROLE_MASTER: + Log("Node role for master: OK", indent=1) + else: + Err("Node role for master: wrong: %s" % reply.server_reply.answer) + + def DoConfdRequestReply(self, req): + self.confd_counting_callback.RegisterQuery(req.rsalt) + self.confd_client.SendRequest(req, async=False) + while not self.confd_counting_callback.AllAnswered(): + if not self.confd_client.ReceiveReply(): + Err("Did not receive all expected confd replies") + break + + def BurnConfd(self): + """Run confd queries for our instances. + + The following confd queries are tested: + - CONFD_REQ_PING: simple ping + - CONFD_REQ_CLUSTER_MASTER: cluster master + - CONFD_REQ_NODE_ROLE_BYNAME: node role, for the master + + """ + Log("Checking confd results") + + hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY) + mc_file = self.ss.KeyToFilename(constants.SS_MASTER_CANDIDATES_IPS) + mc_list = utils.ReadFile(mc_file).splitlines() + filter_callback = confd_client.ConfdFilterCallback(self.ConfdCallback) + counting_callback = confd_client.ConfdCountingCallback(filter_callback) + self.confd_counting_callback = counting_callback + + self.confd_client = confd_client.ConfdClient(hmac_key, mc_list, + counting_callback) + + req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_PING) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_CLUSTER_MASTER) + self.DoConfdRequestReply(req) + + req = confd_client.ConfdClientRequest( + type=constants.CONFD_REQ_NODE_ROLE_BYNAME, + query=self.cluster_info["master"]) + self.DoConfdRequestReply(req) + def _CheckInstanceAlive(self, instance): """Check if an instance is alive by doing http checks. @@ -913,8 +989,14 @@ class Burner(object): if opts.do_addremove_disks: self.BurnAddRemoveDisks() + default_nic_mode = self.cluster_default_nicparams[constants.NIC_MODE] + # Don't add/remove nics in routed mode, as we would need an ip to add + # them with if opts.do_addremove_nics: - self.BurnAddRemoveNICs() + if default_nic_mode == constants.NIC_MODE_BRIDGED: + self.BurnAddRemoveNICs() + else: + Log("Skipping nic add/remove as the cluster is not in bridged mode") if opts.do_activate_disks: self.BurnActivateDisks() @@ -922,6 +1004,9 @@ class Burner(object): if opts.rename: self.BurnRename() + if opts.do_confd_tests: + self.BurnConfd() + if opts.do_startstop: self.BurnStopStart()