Commit 14d4d2f9 authored by Petr Pudlak's avatar Petr Pudlak

Add a transport that works over FDs instead a socket

This allows to set up a client using the Luxi-like protocol over a pipe,
which will be needed for job processes to communicate with their parent
process.

While at it, fix the style of calling __init__ in AbstractStubClient.
Signed-off-by: default avatarPetr Pudlak <pudlak@google.com>
Reviewed-by: default avatarKlaus Aehlig <aehlig@google.com>
parent 2745aa81
......@@ -248,7 +248,8 @@ class AbstractStubClient(AbstractClient):
attribute is defined (in the stub class).
"""
super(AbstractStubClient, self).__init__(timeouts, transport)
super(AbstractStubClient, self).__init__(timeouts=timeouts,
transport=transport)
def _GenericInvoke(self, method, *args):
return self.CallMethod(method, args)
......
......@@ -27,6 +27,7 @@ A transport can send to and receive messages from some endpoint.
import collections
import errno
import io
import socket
import time
......@@ -213,3 +214,84 @@ class Transport:
if self.socket is not None:
self.socket.close()
self.socket = None
class FdTransport:
"""Low-level transport class that works on arbitrary file descriptors.
Unlike L{Transport}, this doesn't use timeouts.
"""
def __init__(self, fds, timeouts=None): # pylint: disable=W0613
"""Constructor for the Client class.
@type fds: pair of file descriptors
@param fds: the file descriptor for reading (the first in the pair)
and the file descriptor for writing (the second)
@type timeouts: int
@param timeouts: unused
"""
self._rstream = io.open(fds[0], 'rb', 0)
self._wstream = io.open(fds[1], 'wb', 0)
self._buffer = ""
self._msgs = collections.deque()
def _CheckSocket(self):
"""Make sure we are connected.
"""
if self._rstream is None or self._wstream is None:
raise errors.ProtocolError("Connection is closed")
def Send(self, msg):
"""Send a message.
This just sends a message and doesn't wait for the response.
"""
if constants.LUXI_EOM in msg:
raise errors.ProtocolError("Message terminator found in payload")
self._CheckSocket()
self._wstream.write(msg + constants.LUXI_EOM)
self._wstream.flush()
def Recv(self):
"""Try to receive a message from the read part of the socket.
In case we already have messages queued, we just return from the
queue.
"""
self._CheckSocket()
while not self._msgs:
data = self._rstream.read(4096)
if not data:
raise errors.ConnectionClosedError("Connection closed while reading")
new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
self._buffer = new_msgs.pop()
self._msgs.extend(new_msgs)
return self._msgs.popleft()
def Call(self, msg):
"""Send a message and wait for the response.
This is just a wrapper over Send and Recv.
"""
self.Send(msg)
return self.Recv()
def Close(self):
"""Close the socket"""
if self._rstream is not None:
self._rstream.close()
self._rstream = None
if self._wstream is not None:
self._wstream.close()
self._wstream = None
def close(self):
self.Close()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment