transport.py 9.53 KB
Newer Older
1 2 3
#
#

4
# Copyright (C) 2013, 2014 Google Inc.
5
# All rights reserved.
6
#
7 8 9
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
10
#
11 12
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
13
#
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 30 31 32 33 34 35 36 37 38


"""Module that defines a transport for RPC connections.

A transport can send to and receive messages from some endpoint.

"""

import collections
import errno
39
import io
40
import logging
41 42 43 44
import socket
import time

from ganeti import constants
45 46
import ganeti.errors
from ganeti import ssconf
47 48 49 50 51 52 53 54 55 56 57 58 59
from ganeti import utils
from ganeti.rpc import errors


DEF_CTMO = constants.LUXI_DEF_CTMO
DEF_RWTO = constants.LUXI_DEF_RWTO


class Transport:
  """Low-level transport class.

  This is used on the client side.

60
  This could be replaced by any other class that provides the same
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  semantics to the Client. This means:
    - can send messages and receive messages
    - safe for multithreading

  """

  def __init__(self, address, timeouts=None):
    """Constructor for the Client class.

    Arguments:
      - address: a valid address the the used transport class
      - timeout: a list of timeouts, to be used on connect and read/write

    There are two timeouts used since we might want to wait for a long
    time for a response, but the connect timeout should be lower.

    If not passed, we use a default of 10 and respectively 60 seconds.

    Note that on reading data, since the timeout applies to an
    invidual receive, it might be that the total duration is longer
    than timeout value passed (we make a hard limit at twice the read
    timeout).

    """
    self.address = address
    if timeouts is None:
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
    else:
      self._ctimeout, self._rwtimeout = timeouts

    self.socket = None
    self._buffer = ""
    self._msgs = collections.deque()

    try:
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

      # Try to connect
      try:
        utils.Retry(self._Connect, 1.0, self._ctimeout,
                    args=(self.socket, address, self._ctimeout))
      except utils.RetryTimeout:
        raise errors.TimeoutError("Connect timed out")

      self.socket.settimeout(self._rwtimeout)
    except (socket.error, errors.NoMasterError):
      if self.socket is not None:
        self.socket.close()
      self.socket = None
      raise

  @staticmethod
  def _Connect(sock, address, timeout):
    sock.settimeout(timeout)
    try:
      sock.connect(address)
    except socket.timeout, err:
      raise errors.TimeoutError("Connect timed out: %s" % str(err))
    except socket.error, err:
      error_code = err.args[0]
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
122 123 124 125 126 127 128 129 130
        # Verify if we're acutally on the master node before trying
        # again.
        ss = ssconf.SimpleStore()
        try:
          master, myself = ssconf.GetMasterAndMyself(ss=ss)
        except ganeti.errors.ConfigurationError:
          raise errors.NoMasterError(address)
        if master != myself:
          raise errors.NoMasterError(address)
131
        raise utils.RetryAgain()
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
      elif error_code in (errno.EPERM, errno.EACCES):
        raise errors.PermissionError(address)
      elif error_code == errno.EAGAIN:
        # Server's socket backlog is full at the moment
        raise utils.RetryAgain()
      raise

  def _CheckSocket(self):
    """Make sure we are connected.

    """
    if self.socket is None:
      raise errors.ProtocolError("Connection is closed")

  def Send(self, msg):
    """Send a message.

    This just sends a message and doesn't wait for the response.

    """
    if constants.LUXI_EOM in msg:
      raise errors.ProtocolError("Message terminator found in payload")

    self._CheckSocket()
    try:
      # TODO: sendall is not guaranteed to send everything
      self.socket.sendall(msg + constants.LUXI_EOM)
    except socket.timeout, err:
      raise errors.TimeoutError("Sending timeout: %s" % str(err))

  def Recv(self):
    """Try to receive a message from the socket.

    In case we already have messages queued, we just return from the
    queue. Otherwise, we try to read data with a _rwtimeout network
    timeout, and making sure we don't go over 2x_rwtimeout as a global
    limit.

    """
    self._CheckSocket()
    etime = time.time() + self._rwtimeout
    while not self._msgs:
      if time.time() > etime:
        raise errors.TimeoutError("Extended receive timeout")
      while True:
        try:
          data = self.socket.recv(4096)
        except socket.timeout, err:
          raise errors.TimeoutError("Receive timeout: %s" % str(err))
        except socket.error, err:
          if err.args and err.args[0] == errno.EAGAIN:
            continue
          raise
        break
      if not data:
        raise errors.ConnectionClosedError("Connection closed while reading")
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def Call(self, msg):
    """Send a message and wait for the response.

    This is just a wrapper over Send and Recv.

    """
    self.Send(msg)
    return self.Recv()

202
  @staticmethod
203
  def RetryOnNetworkError(fn, on_error, retries=15, wait_on_error=5):
204
    """Calls a given function, retrying if it fails on a network IO
205 206 207 208 209
    exception.

    This allows to re-establish a broken connection and retry an IO operation.

    The function receives one an integer argument stating the current retry
210 211
    number, 0 being the first call, 1 being the first retry, 2 the second,
    and so on.
212 213

    If any exception occurs, on_error is invoked first with the exception given
214 215
    as an argument. Then, if the exception is a network exception, the function
    call is retried once more.
216 217 218 219 220

    """
    for try_no in range(0, retries):
      try:
        return fn(try_no)
221 222
      except (socket.error, errors.ConnectionClosedError,
              errors.TimeoutError) as ex:
223
        on_error(ex)
224
        # we retry on a network error, unless it's the last try
225 226
        if try_no == retries - 1:
          raise
227 228
        logging.error("Network error: %s, retring (retry attempt number %d)",
                      ex, try_no + 1)
229
        time.sleep(wait_on_error * try_no)
230 231 232 233 234
      except Exception, ex:
        on_error(ex)
        raise
    assert False # we should never get here

235 236 237 238 239
  def Close(self):
    """Close the socket"""
    if self.socket is not None:
      self.socket.close()
      self.socket = None
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320


class FdTransport:
  """Low-level transport class that works on arbitrary file descriptors.

  Unlike L{Transport}, this doesn't use timeouts.
  """

  def __init__(self, fds, timeouts=None): # pylint: disable=W0613
    """Constructor for the Client class.

    @type fds: pair of file descriptors
    @param fds: the file descriptor for reading (the first in the pair)
        and the file descriptor for writing (the second)
    @type timeouts: int
    @param timeouts: unused

    """
    self._rstream = io.open(fds[0], 'rb', 0)
    self._wstream = io.open(fds[1], 'wb', 0)

    self._buffer = ""
    self._msgs = collections.deque()

  def _CheckSocket(self):
    """Make sure we are connected.

    """
    if self._rstream is None or self._wstream is None:
      raise errors.ProtocolError("Connection is closed")

  def Send(self, msg):
    """Send a message.

    This just sends a message and doesn't wait for the response.

    """
    if constants.LUXI_EOM in msg:
      raise errors.ProtocolError("Message terminator found in payload")

    self._CheckSocket()
    self._wstream.write(msg + constants.LUXI_EOM)
    self._wstream.flush()

  def Recv(self):
    """Try to receive a message from the read part of the socket.

    In case we already have messages queued, we just return from the
    queue.

    """
    self._CheckSocket()
    while not self._msgs:
      data = self._rstream.read(4096)
      if not data:
        raise errors.ConnectionClosedError("Connection closed while reading")
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
      self._buffer = new_msgs.pop()
      self._msgs.extend(new_msgs)
    return self._msgs.popleft()

  def Call(self, msg):
    """Send a message and wait for the response.

    This is just a wrapper over Send and Recv.

    """
    self.Send(msg)
    return self.Recv()

  def Close(self):
    """Close the socket"""
    if self._rstream is not None:
      self._rstream.close()
      self._rstream = None
    if self._wstream is not None:
      self._wstream.close()
      self._wstream = None

  def close(self):
    self.Close()