Commit dfdc4060 authored by Guido Trotter's avatar Guido Trotter

WaitForSocketCondition: rename, handle EINTR

- Rename WaitForSocketCondition to SingleWaitForFdCondition
  - Avoid potentially infinite loop, if we continue to get interrupted
  - Handle eintr correctly
  - Avoid the poller try/finally, as the poller object gets destroyed
    anyway
- Provide a new WaitForFdCondition
  - Using retry, guarantee to continue checking until the timeout
    expires
  - Needs an extra helper class, as it uses retry in a very custom way
    (no sleep happens, because the poller sleeps by itself)
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent dcd511c8
......@@ -361,7 +361,7 @@ def SocketOperation(sock, op, arg1, timeout):
else:
wait_for_event = event_poll
event = utils.WaitForSocketCondition(sock, wait_for_event, timeout)
event = utils.WaitForFdCondition(sock, wait_for_event, timeout)
if event is None:
raise HttpSocketTimeout()
......
......@@ -250,8 +250,8 @@ class HttpClientRequestExecutor(http.HttpBase):
if not connected:
# Wait for connection
event = utils.WaitForSocketCondition(self.sock, select.POLLOUT,
self.CONNECT_TIMEOUT)
event = utils.WaitForFdCondition(self.sock, select.POLLOUT,
self.CONNECT_TIMEOUT)
if event is None:
raise http.HttpError("Timeout while connecting to server")
......
......@@ -1497,12 +1497,14 @@ except NameError:
return False
def WaitForSocketCondition(sock, event, timeout):
def SingleWaitForFdCondition(fdobj, event, timeout):
"""Waits for a condition to occur on the socket.
@type sock: socket
@param sock: Wait for events on this socket
@type event: int
Immediately returns at the first interruption.
@type fdobj: integer or object supporting a fileno() method
@param fdobj: entity to wait for events on
@type event: integer
@param event: ORed condition (see select module)
@type timeout: float or None
@param timeout: Timeout in seconds
......@@ -1518,21 +1520,69 @@ def WaitForSocketCondition(sock, event, timeout):
timeout *= 1000
poller = select.poll()
poller.register(sock, event)
poller.register(fdobj, event)
try:
while True:
# TODO: If the main thread receives a signal and we have no timeout, we
# could wait forever. This should check a global "quit" flag or
# something every so often.
io_events = poller.poll(timeout)
if not io_events:
# Timeout
return None
for (_, evcond) in io_events:
if evcond & check:
return evcond
finally:
poller.unregister(sock)
# TODO: If the main thread receives a signal and we have no timeout, we
# could wait forever. This should check a global "quit" flag or something
# every so often.
io_events = poller.poll(timeout)
except select.error, err:
if err[0] != errno.EINTR:
raise
io_events = []
if io_events and io_events[0][1] & check:
return io_events[0][1]
else:
return None
class FdConditionWaiterHelper(object):
"""Retry helper for WaitForFdCondition.
This class contains the retried and wait functions that make sure
WaitForFdCondition can continue waiting until the timeout is actually
expired.
"""
def __init__(self, timeout):
self.timeout = timeout
def Poll(self, fdobj, event):
result = SingleWaitForFdCondition(fdobj, event, self.timeout)
if result is None:
raise RetryAgain()
else:
return result
def UpdateTimeout(self, timeout):
self.timeout = timeout
def WaitForFdCondition(fdobj, event, timeout):
"""Waits for a condition to occur on the socket.
Retries until the timeout is expired, even if interrupted.
@type fdobj: integer or object supporting a fileno() method
@param fdobj: entity to wait for events on
@type event: integer
@param event: ORed condition (see select module)
@type timeout: float or None
@param timeout: Timeout in seconds
@rtype: int or None
@return: None for timeout, otherwise occured conditions
"""
if timeout is not None:
retrywaiter = FdConditionWaiterHelper(timeout)
result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout,
args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout)
else:
result = None
while result is None:
result = SingleWaitForFdCondition(fdobj, event, timeout)
return result
def partition(seq, pred=bool): # # pylint: disable-msg=W0622
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment